fix: Changes the backend.Walk implementation to return non-truncated result if both delimiter and max-keys are provided

Fixes #816

`ListObjects(V2)` used to return truncated response, if delimiter is provided and the result is limited by max-keys and the number of common prefixes is the same as `max-keys`.
e.g
PUT -> `foo/bar`
PUT -> `foo/quxx`
LIST: `max-keys=1;delim=/` -> foo/

`ListObjects(V2)` should return `foo/` as common prefix and `truncated` should be `false`.

The PR makes this fix to return `non-truncated` response for the above described case.
This commit is contained in:
niksis02
2025-04-09 21:29:58 +04:00
parent eb72905401
commit aaee7afa98
3 changed files with 91 additions and 12 deletions

View File

@@ -44,12 +44,19 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
cpmap := make(map[string]struct{})
var objects []s3response.Object
// if max is 0, it should return empty non-truncated result
if max == 0 {
return WalkResults{
Truncated: false,
}, nil
}
var pastMarker bool
if marker == "" {
pastMarker = true
}
pastMax := max == 0
var pastMax bool
var newMarker string
var truncated bool
@@ -76,14 +83,6 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
return fs.SkipDir
}
if pastMax {
if len(objects) != 0 {
newMarker = *objects[len(objects)-1].Key
truncated = true
}
return fs.SkipAll
}
// After this point, return skipflag instead of nil
// so we can skip a directory without an early return
var skipflag error
@@ -116,7 +115,15 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
if err != nil {
return fmt.Errorf("directory to object %q: %w", path, err)
}
if pastMax {
truncated = true
return fs.SkipAll
}
objects = append(objects, dirobj)
if (len(objects) + len(cpmap)) == int(max) {
newMarker = path
pastMax = true
}
return skipflag
}
@@ -160,9 +167,15 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
if pastMax {
truncated = true
return fs.SkipAll
}
objects = append(objects, obj)
if max > 0 && (len(objects)+len(cpmap)) == int(max) {
if (len(objects) + len(cpmap)) == int(max) {
newMarker = path
pastMax = true
}
@@ -200,8 +213,13 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
if pastMax {
truncated = true
return fs.SkipAll
}
objects = append(objects, obj)
if (len(objects) + len(cpmap)) == int(max) {
newMarker = path
pastMax = true
}
return skipflag
@@ -222,11 +240,14 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
return skipflag
}
if pastMax {
truncated = true
return fs.SkipAll
}
cpmap[cpref] = struct{}{}
if (len(objects) + len(cpmap)) == int(max) {
newMarker = cpref
truncated = true
return fs.SkipAll
pastMax = true
}
return skipflag
@@ -252,6 +273,10 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
})
}
if !truncated {
newMarker = ""
}
return WalkResults{
CommonPrefixes: commonPrefixes,
Objects: objects,

View File

@@ -220,6 +220,7 @@ func TestListObjects(s *S3Conf) {
ListObjects_list_all_objs(s)
ListObjects_nested_dir_file_objs(s)
ListObjects_check_owner(s)
ListObjects_non_truncated_common_prefixes(s)
//TODO: remove the condition after implementing checksums in azure
if !s.azureTests {
ListObjects_with_checksum(s)
@@ -905,6 +906,7 @@ func GetIntTests() IntTests {
"ListObjects_list_all_objs": ListObjects_list_all_objs,
"ListObjects_nested_dir_file_objs": ListObjects_nested_dir_file_objs,
"ListObjects_check_owner": ListObjects_check_owner,
"ListObjects_non_truncated_common_prefixes": ListObjects_non_truncated_common_prefixes,
"ListObjects_with_checksum": ListObjects_with_checksum,
"ListObjectsV2_start_after": ListObjectsV2_start_after,
"ListObjectsV2_both_start_after_and_continuation_token": ListObjectsV2_both_start_after_and_continuation_token,

View File

@@ -5007,6 +5007,58 @@ func ListObjects_check_owner(s *S3Conf) error {
})
}
func ListObjects_non_truncated_common_prefixes(s *S3Conf) error {
testName := "ListObjects_non_truncated_common_prefixes"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
_, err := putObjects(s3client, []string{"asdf", "boo/bar", "boo/baz/xyzzy", "cquux/thud", "cquux/bla"}, bucket)
if err != nil {
return err
}
delim, marker, maxKeys := "/", "boo/", int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
Bucket: &bucket,
Marker: &marker,
Delimiter: &delim,
MaxKeys: &maxKeys,
})
cancel()
if err != nil {
return err
}
if res.IsTruncated == nil {
return fmt.Errorf("expected non-nil istruncated")
}
if *res.IsTruncated {
return fmt.Errorf("expected non-truncated result")
}
if res.MaxKeys == nil {
return fmt.Errorf("expected non nil max-keys")
}
if *res.MaxKeys != maxKeys {
return fmt.Errorf("expected max-keys to be %v, instead got %v", maxKeys, *res.MaxKeys)
}
if getString(res.Delimiter) != delim {
return fmt.Errorf("expected delimiter to be %v, instead got %v", delim, getString(res.Delimiter))
}
if getString(res.Marker) != marker {
return fmt.Errorf("expected marker to be %v, instead got %v", getString(res.Marker), marker)
}
if len(res.Contents) != 0 {
return fmt.Errorf("expected empty contents, instead got %+v", res.Contents)
}
cPrefs := []string{"cquux/"}
if !comparePrefixes(cPrefs, res.CommonPrefixes) {
return fmt.Errorf("expected common prefixes to be %v, instead got %+v", cPrefs, res.CommonPrefixes)
}
return nil
})
}
func ListObjectsV2_start_after(s *S3Conf) error {
testName := "ListObjectsV2_start_after"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {