fix: lex sort order of listobjectversions backend.WalkVersions

Similar to:
  8e18b43116
  fix: lex sort order of listobjects backend.Walk
But now the "Versions" walk.

The original backend.WalkVersions function used the native WalkDir and ReadDir
which did not guarantee lexicographic ordering of results for cases where
including directory slash changes the sort order. This caused incorrect
paginated responses because S3 APIs require strict lexicographic ordering
where directories with trailing slashes sort correctly relative to files.
For example, dir1/a.b/ must come before dir1/a/ in the results, but
fs.WalkDir was returning them in filesystem sort order which reversed
the order due to not taking in account the trailing "/".
This commit is contained in:
Ben McClelland
2025-09-12 10:44:40 -07:00
parent 148836bb0c
commit 34da18337e
4 changed files with 696 additions and 190 deletions

View File

@@ -624,179 +624,382 @@ type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *boo
// WalkVersions walks the supplied fs.FS and returns results compatible with
// ListObjectVersions action response
func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getObj GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) {
cpmap := cpMap{}
var objects []s3response.ObjectVersion
var delMarkers []types.DeleteMarkerEntry
var pastMarker bool
if keyMarker == "" {
pastMarker = true
func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getVersions GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) {
if max == 0 {
return WalkVersioningResults{}, nil
}
var nextMarker string
var nextVersionIdMarker string
var truncated bool
pastVersionIdMarker := versionIdMarker == ""
state := &versionWalkState{
ctx: ctx,
fileSystem: fileSystem,
prefix: prefix,
delimiter: delimiter,
keyMarker: keyMarker,
versionIdMarker: versionIdMarker,
max: max,
getVersions: getVersions,
skipdirs: skipdirs,
cpmap: cpMap{},
pastMarker: keyMarker == "",
pastVersionIdMarker: versionIdMarker == "",
}
err := fs.WalkDir(fileSystem, pathDot, func(path string, d fs.DirEntry, err error) error {
// Determine traversal root similar to Walk
root := pathDot
if strings.Contains(prefix, pathSeparator) {
if idx := strings.LastIndex(prefix, pathSeparator); idx > 0 {
root = prefix[:idx]
}
}
// Handle special case for prefix directories in non-delimiter mode
if delimiter == "" && prefix != "" && strings.HasSuffix(prefix, pathSeparator) {
err := state.handleVersionPrefixDirectory(prefix)
if err != nil {
return err
return WalkVersioningResults{}, err
}
if ctx.Err() != nil {
return ctx.Err()
}
if err := state.walkLexSortVersions(root); err != nil {
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
return WalkVersioningResults{}, nil
}
// Ignore the root directory
if path == pathDot {
return nil
}
if slices.Contains(skipdirs, d.Name()) {
return fs.SkipDir
}
if !pastMarker {
if path == keyMarker {
pastMarker = true
}
if path < keyMarker {
return nil
}
}
if d.IsDir() {
// If prefix is defined and the directory does not match prefix,
// do not descend into the directory because nothing will
// match this prefix. Make sure to append the / at the end of
// directories since this is implied as a directory path name.
// If path is a prefix of prefix, then path could still be
// building to match. So only skip if path isn't a prefix of prefix
// and prefix isn't a prefix of path.
if prefix != "" &&
!strings.HasPrefix(path+pathSeparator, prefix) &&
!strings.HasPrefix(prefix, path+pathSeparator) {
return fs.SkipDir
}
// Don't recurse into subdirectories when listing with delimiter.
if delimiter == pathSeparator &&
prefix != path+pathSeparator &&
strings.HasPrefix(path+pathSeparator, prefix) {
cpmap.Add(path + pathSeparator)
return fs.SkipDir
}
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("directory to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// If object doesn't have prefix, don't include in results.
if prefix != "" && !strings.HasPrefix(path, prefix) {
return nil
}
if delimiter == "" {
// If no delimiter specified, then all files with matching
// prefix are included in results
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// Since delimiter is specified, we only want results that
// do not contain the delimiter beyond the prefix. If the
// delimiter exists past the prefix, then the substring
// between the prefix and delimiter is part of common prefixes.
//
// For example:
// prefix = A/
// delimiter = /
// and objects:
// A/file
// A/B/file
// B/C
// would return:
// objects: A/file
// common prefix: A/B/
//
// Note: No objects are included past the common prefix since
// these are all rolled up into the common prefix.
// Note: The delimiter can be anything, so we have to operate on
// the full path without any assumptions on posix directory hierarchy
// here. Usually the delimiter will be pathSeparator, but thats not required.
suffix := strings.TrimPrefix(path, prefix)
before, _, found := strings.Cut(suffix, delimiter)
if !found {
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// Common prefixes are a set, so should not have duplicates.
// These are abstractly a "directory", so need to include the
// delimiter at the end.
cpmap.Add(prefix + before + delimiter)
if (len(objects) + cpmap.Len()) == int(max) {
nextMarker = path
truncated = true
return fs.SkipAll
}
return nil
})
if err != nil {
return WalkVersioningResults{}, err
}
if !state.truncated {
state.newMarker = ""
}
return WalkVersioningResults{
CommonPrefixes: cpmap.CpArray(),
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: truncated,
NextMarker: nextMarker,
NextVersionIdMarker: nextVersionIdMarker,
CommonPrefixes: state.cpmap.CpArray(),
ObjectVersions: state.objectVersions,
DelMarkers: state.delMarkers,
Truncated: state.truncated,
NextMarker: state.newMarker,
NextVersionIdMarker: state.newVersionIdMarker,
}, nil
}
// versionWalkState holds the state needed during version directory traversal
type versionWalkState struct {
ctx context.Context
fileSystem fs.FS
prefix string
delimiter string
keyMarker string
versionIdMarker string
max int
getVersions GetVersionsFunc
skipdirs []string
cpmap cpMap
objectVersions []s3response.ObjectVersion
delMarkers []types.DeleteMarkerEntry
pastMarker bool
pastVersionIdMarker bool
pastMax bool
newMarker string
newVersionIdMarker string
truncated bool
}
// handleVersionPrefixDirectory handles the special case where we need to include the prefix directory itself
func (v *versionWalkState) handleVersionPrefixDirectory(prefix string) error {
prefixDir := strings.TrimSuffix(prefix, pathSeparator)
name := prefixDir
if idx := strings.LastIndex(prefixDir, pathSeparator); idx >= 0 {
name = prefixDir[idx+1:]
}
if v.pastMarker || v.keyMarker == "" || prefixDir <= v.keyMarker {
if v.keyMarker == "" || prefixDir <= v.keyMarker { // equality also allowed to advance marker
if prefixDir != v.keyMarker { // avoid duplicate if marker points exactly here
err := v.addVersions(prefixDir, &fakeDirEntry{name: name, isDir: true})
if err != nil {
return err
}
}
if prefixDir == v.keyMarker {
v.pastMarker = true
}
}
}
return nil
}
// walkLexSortVersions performs lexicographically ordered traversal for versions
func (v *versionWalkState) walkLexSortVersions(dir string) error {
if v.ctx.Err() != nil {
return v.ctx.Err()
}
entries, err := buildSortedEntries(v.fileSystem, dir, v.skipdirs)
if err != nil {
return err
}
for i, entry := range entries {
if v.checkLimits() {
return v.ctx.Err()
}
if v.pastMax {
// We have more entries to process, so mark as truncated
v.truncated = true
return nil
}
if err := v.processVersionEntry(entry); err != nil {
return err
}
// After processing an entry, check if we hit the limit
if v.pastMax {
// We hit the limit. Check if there are more entries after this one
if i+1 < len(entries) {
// There are more entries in this directory
v.truncated = true
}
// This was the last entry in this directory, but there might be
// more dirs to recurse into
// Let the higher level determine if we're truly done
return nil
}
}
return nil
}
func (v *versionWalkState) processVersionEntry(entry entryWithSortKey) error {
path := entry.path
d := entry.d
if d.IsDir() {
return v.processVersionDirectory(path, d)
}
return v.processVersionFile(path, d)
}
// processVersionDirectory handles directory processing logic
func (v *versionWalkState) processVersionDirectory(path string, d fs.DirEntry) error {
dirPath := path + pathSeparator
// Check if we should skip this directory based on prefix
if shouldSkipDirectory(dirPath, v.prefix) {
return nil
}
// Handle marker logic for directories
if v.isBeforeMarker(path) { // path used (no trailing /) consistent with markers
// Before marker - only recurse if marker could be inside
if strings.HasPrefix(v.keyMarker, dirPath) {
return v.walkLexSortVersions(path)
}
return nil
}
if v.isAtMarker(path) {
// At marker - recurse but don't include as common prefix
v.pastMarker = true
return v.walkLexSortVersions(path)
}
// Apply prefix filter
if v.prefix != "" && !strings.HasPrefix(dirPath, v.prefix) {
return v.walkLexSortVersions(path)
}
if v.delimiter != "" {
return v.processVersionDirectoryWithDelimiter(path, dirPath, d)
}
return v.processVersionDirectoryWithoutDelimiter(path, dirPath, d)
}
// processVersionDirectoryWithDelimiter handles directory processing when delimiter is specified
func (v *versionWalkState) processVersionDirectoryWithDelimiter(path, dirPath string, d fs.DirEntry) error {
// Special case: if the directory path exactly matches the prefix, return it as an object
if dirPath == v.prefix {
if err := v.addVersions(path, d); err != nil {
return err
}
if v.pastMax {
return nil
}
}
// Handle delimiter logic for directories
suffix := strings.TrimPrefix(dirPath, v.prefix)
before, _, found := strings.Cut(suffix, v.delimiter)
if found {
// Directory creates a common prefix
cprefNoDelim := v.prefix + before
cpref := cprefNoDelim + v.delimiter
if v.handleCommonPrefix(cpref) {
return nil
}
if v.keyMarker != "" && strings.HasPrefix(v.keyMarker, cprefNoDelim) {
return v.walkLexSortVersions(path)
}
}
return v.walkLexSortVersions(path)
}
// processVersionDirectoryWithoutDelimiter handles directory processing when no delimiter is specified
func (v *versionWalkState) processVersionDirectoryWithoutDelimiter(path, dirPath string, d fs.DirEntry) error {
// Include directory as object if it matches prefix
if v.prefix == "" || strings.HasPrefix(dirPath, v.prefix) {
if err := v.addVersions(path, d); err != nil {
return err
}
if v.pastMax {
return nil
}
}
return v.walkLexSortVersions(path)
}
// processVersionFile handles file processing logic
func (v *versionWalkState) processVersionFile(path string, d fs.DirEntry) error {
if v.isBeforeMarker(path) {
return nil
}
if v.isAtMarker(path) {
v.pastMarker = true
// Don't return here - we need to process this object for remaining versions
}
if v.prefix != "" && !strings.HasPrefix(path, v.prefix) {
return nil
}
if v.delimiter != "" {
return v.processVersionFileWithDelimiter(path, d)
}
return v.processVersionFileWithoutDelimiter(path, d)
}
// processVersionFileWithDelimiter handles file processing when delimiter is specified
func (v *versionWalkState) processVersionFileWithDelimiter(path string, d fs.DirEntry) error {
suffix := strings.TrimPrefix(path, v.prefix)
before, _, found := strings.Cut(suffix, v.delimiter)
if !found {
// File doesn't contain delimiter after prefix - include it
if err := v.addVersions(path, d); err != nil {
return err
}
if v.pastMax {
return nil
}
return nil
}
// File contains delimiter after prefix - add to common prefixes
cprefNoDelim := v.prefix + before
cpref := cprefNoDelim + v.delimiter
if v.handleCommonPrefix(cpref) {
return nil
}
return nil
}
// processVersionFileWithoutDelimiter handles file processing when no delimiter is specified
func (v *versionWalkState) processVersionFileWithoutDelimiter(path string, d fs.DirEntry) error {
if err := v.addVersions(path, d); err != nil {
return err
}
return nil
}
// available returns remaining capacity (objects + delete markers + prefixes)
// before reaching the max listing size.
func (v *versionWalkState) available() int {
return v.max - (len(v.objectVersions) + len(v.delMarkers) + v.cpmap.Len())
}
// checkLimits returns true if we've passed the max or context cancelled.
func (v *versionWalkState) checkLimits() bool {
return v.pastMax || v.ctx.Err() != nil
}
// isBeforeMarker determines if path sorts before current key marker.
func (v *versionWalkState) isBeforeMarker(path string) bool {
return !v.pastMarker && path < v.keyMarker
}
// isAtMarker determines if path equals the current key marker.
func (v *versionWalkState) isAtMarker(path string) bool {
return !v.pastMarker && path == v.keyMarker
}
// addVersions fetches version information for a path and updates state.
func (v *versionWalkState) addVersions(path string, d fs.DirEntry) error {
if v.available() <= 0 {
v.pastMax = true
return nil
}
res, err := v.getVersions(path, v.versionIdMarker, &v.pastVersionIdMarker, v.available(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("versions for object %q: %w", path, err)
}
v.objectVersions = append(v.objectVersions, res.ObjectVersions...)
v.delMarkers = append(v.delMarkers, res.DelMarkers...)
if res.Truncated {
v.truncated = true
v.newMarker = path
v.newVersionIdMarker = res.NextVersionIdMarker
v.pastMax = true
return nil
}
if v.available() <= 0 {
v.newMarker = path
v.pastMax = true
}
return nil
}
// addCommonPrefix records a common prefix if capacity allows.
func (v *versionWalkState) addCommonPrefix(cpref string) {
if v.pastMax {
return
}
v.cpmap.Add(cpref)
if v.available() <= 0 {
v.newMarker = cpref
v.pastMax = true
}
}
// handleCommonPrefix processes common prefix logic for version walking
func (v *versionWalkState) handleCommonPrefix(cpref string) bool {
if v.isBeforeMarker(cpref) {
return false // Don't process this prefix
}
if v.isAtMarker(cpref) {
v.pastMarker = true
return false // Don't include as common prefix but continue processing
}
// Skip if this common prefix is <= marker (for when pastMarker is already true)
if v.keyMarker != "" && cpref <= v.keyMarker {
return false
}
if v.pastMax {
v.truncated = true
return true // Stop processing
}
v.addCommonPrefix(cpref)
return v.pastMax // Return true if we should stop processing
}

View File

@@ -419,30 +419,14 @@ func TestOrderWalk(t *testing.T) {
prefix: "dir1/",
expected: backend.WalkResults{
Objects: []s3response.Object{
{
Key: backend.GetPtrFromString("dir1/"),
},
{
Key: backend.GetPtrFromString("dir1/a.b/"),
},
{
Key: backend.GetPtrFromString("dir1/a.b/file1"),
},
{
Key: backend.GetPtrFromString("dir1/a.b/file2"),
},
{
Key: backend.GetPtrFromString("dir1/a/"),
},
{
Key: backend.GetPtrFromString("dir1/a/file1"),
},
{
Key: backend.GetPtrFromString("dir1/a/file2"),
},
{
Key: backend.GetPtrFromString("dir1/a/file3"),
},
{Key: backend.GetPtrFromString("dir1/")},
{Key: backend.GetPtrFromString("dir1/a.b/")},
{Key: backend.GetPtrFromString("dir1/a.b/file1")},
{Key: backend.GetPtrFromString("dir1/a.b/file2")},
{Key: backend.GetPtrFromString("dir1/a/")},
{Key: backend.GetPtrFromString("dir1/a/file1")},
{Key: backend.GetPtrFromString("dir1/a/file2")},
{Key: backend.GetPtrFromString("dir1/a/file3")},
},
},
},
@@ -801,3 +785,309 @@ func comparePrefixesOrdered(a, b []types.CommonPrefix) bool {
}
return true
}
// ---- Versioning Tests ----
// getVersionsTestFunc is a simple GetVersionsFunc implementation for tests that
// returns a single latest version for each file or directory encountered.
// Directories are reported with a trailing delimiter in the key to match the
// behavior of the non-versioned Walk tests where directory objects are listed.
func getVersionsTestFunc(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) {
// If we have no available slots left, signal truncation (should be rare in these tests)
if availableObjCount <= 0 {
return &backend.ObjVersionFuncResult{Truncated: true, NextVersionIdMarker: ""}, nil
}
key := path
if d.IsDir() {
key = key + "/"
}
ver := "v1"
latest := true
ov := s3response.ObjectVersion{Key: &key, VersionId: &ver, IsLatest: &latest}
return &backend.ObjVersionFuncResult{ObjectVersions: []s3response.ObjectVersion{ov}}, nil
}
// TestWalkVersions mirrors TestWalk but exercises WalkVersions and validates
// common prefixes and object versions for typical delimiter/prefix scenarios.
func TestWalkVersions(t *testing.T) {
fsys := fstest.MapFS{
"dir1/a/file1": {},
"dir1/a/file2": {},
"dir1/b/file3": {},
"rootfile": {},
}
// Without a delimiter, every directory and file becomes an object version
// via the test GetVersionsFunc (directories have trailing '/').
expected := backend.WalkVersioningResults{
ObjectVersions: []s3response.ObjectVersion{
{Key: backend.GetPtrFromString("dir1/")},
{Key: backend.GetPtrFromString("dir1/a/")},
{Key: backend.GetPtrFromString("dir1/a/file1")},
{Key: backend.GetPtrFromString("dir1/a/file2")},
{Key: backend.GetPtrFromString("dir1/b/")},
{Key: backend.GetPtrFromString("dir1/b/file3")},
{Key: backend.GetPtrFromString("rootfile")},
},
}
res, err := backend.WalkVersions(context.Background(), fsys, "", "", "", "", 1000, getVersionsTestFunc, []string{})
if err != nil {
t.Fatalf("walk versions: %v", err)
}
compareVersionResultsOrdered("simple versions no delimiter", res, expected, t)
}
// TestOrderWalkVersions mirrors TestOrderWalk, exercising ordering semantics for
// version listings (lexicographic ordering of directory and file version keys).
func TestOrderWalkVersions(t *testing.T) {
fsys := fstest.MapFS{
"dir1/a/file1": {},
"dir1/a/file2": {},
"dir1/a/file3": {},
"dir1/a.b/file1": {},
"dir1/a.b/file2": {},
}
// Expect lexicographic ordering similar to non-version walk when no delimiter.
expected := backend.WalkVersioningResults{
ObjectVersions: []s3response.ObjectVersion{
{Key: backend.GetPtrFromString("dir1/")},
{Key: backend.GetPtrFromString("dir1/a.b/")},
{Key: backend.GetPtrFromString("dir1/a.b/file1")},
{Key: backend.GetPtrFromString("dir1/a.b/file2")},
{Key: backend.GetPtrFromString("dir1/a/")},
{Key: backend.GetPtrFromString("dir1/a/file1")},
{Key: backend.GetPtrFromString("dir1/a/file2")},
{Key: backend.GetPtrFromString("dir1/a/file3")},
},
}
res, err := backend.WalkVersions(context.Background(), fsys, "dir1/", "", "", "", 1000, getVersionsTestFunc, []string{})
if err != nil {
t.Fatalf("order walk versions: %v", err)
}
compareVersionResultsOrdered("order versions no delimiter", res, expected, t)
}
// compareVersionResults compares unordered sets of common prefixes and object versions
// compareVersionResultsOrdered compares ordered slices
func compareVersionResultsOrdered(name string, got, wanted backend.WalkVersioningResults, t *testing.T) {
if !compareObjectVersionsOrdered(got.ObjectVersions, wanted.ObjectVersions) {
t.Errorf("%v: unexpected object versions, got %v wanted %v", name, printVersionObjects(got.ObjectVersions), printVersionObjects(wanted.ObjectVersions))
}
if !comparePrefixesOrdered(got.CommonPrefixes, wanted.CommonPrefixes) {
t.Errorf("%v: unexpected prefix, got %v wanted %v", name, printCommonPrefixes(got.CommonPrefixes), printCommonPrefixes(wanted.CommonPrefixes))
}
}
func compareObjectVersionsOrdered(a, b []s3response.ObjectVersion) bool {
if len(a) == 0 && len(b) == 0 {
return true
}
if len(a) != len(b) {
return false
}
for i, ov := range a {
if ov.Key == nil || b[i].Key == nil {
return false
}
if *ov.Key != *b[i].Key {
return false
}
}
return true
}
func printVersionObjects(list []s3response.ObjectVersion) string {
res := "["
for _, ov := range list {
var key string
if ov.Key == nil {
key = "<nil>"
} else {
key = *ov.Key
}
if res == "[" {
res = res + key
} else {
res = res + ", " + key
}
}
return res + "]"
}
// multiVersionGetVersionsFunc is a more sophisticated test function that simulates
// multiple versions per object, similar to the integration test behavior.
// It creates multiple versions for each file with deterministic version IDs.
func createMultiVersionFunc(files map[string]int) backend.GetVersionsFunc {
// Pre-generate all versions for deterministic testing
versionedFiles := make(map[string][]s3response.ObjectVersion)
for path, versionCount := range files {
versions := make([]s3response.ObjectVersion, versionCount)
for i := range versionCount {
versionId := fmt.Sprintf("v%d", i+1)
isLatest := i == versionCount-1 // Last version is latest
key := path
versions[i] = s3response.ObjectVersion{
Key: &key,
VersionId: &versionId,
IsLatest: &isLatest,
}
}
// Reverse slice so latest comes first (reverse chronological order)
for i, j := 0, len(versions)-1; i < j; i, j = i+1, j-1 {
versions[i], versions[j] = versions[j], versions[i]
}
versionedFiles[path] = versions
}
return func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) {
if availableObjCount <= 0 {
return &backend.ObjVersionFuncResult{Truncated: true}, nil
}
// Handle directories - just return a single directory version
if d.IsDir() {
key := path + "/"
ver := "v1"
latest := true
ov := s3response.ObjectVersion{Key: &key, VersionId: &ver, IsLatest: &latest}
return &backend.ObjVersionFuncResult{ObjectVersions: []s3response.ObjectVersion{ov}}, nil
}
// Get versions for this file
versions, exists := versionedFiles[path]
if !exists {
// No versions for this file, skip it
return &backend.ObjVersionFuncResult{}, backend.ErrSkipObj
}
// Handle version ID marker pagination
startIdx := 0
if versionIdMarker != "" && !*pastVersionIdMarker {
// Find the starting position after the marker
for i, version := range versions {
if *version.VersionId == versionIdMarker {
startIdx = i + 1
*pastVersionIdMarker = true
break
}
}
}
// Return available versions up to the limit
endIdx := min(startIdx+availableObjCount, len(versions))
result := &backend.ObjVersionFuncResult{
ObjectVersions: versions[startIdx:endIdx],
}
// Check if we need to truncate
if endIdx < len(versions) {
result.Truncated = true
result.NextVersionIdMarker = *versions[endIdx-1].VersionId
}
return result, nil
}
}
// TestWalkVersionsTruncated tests the pagination behavior of WalkVersions
// when there are multiple versions per object and the result is truncated.
// This mirrors the integration test ListObjectVersions_multiple_object_versions_truncated.
func TestWalkVersionsTruncated(t *testing.T) {
// Create filesystem with the same files as integration test
fsys := fstest.MapFS{
"foo": {},
"bar": {},
"baz": {},
}
// Define version counts per file (matching integration test)
versionCounts := map[string]int{
"foo": 4, // 4 versions
"bar": 3, // 3 versions
"baz": 5, // 5 versions
}
getVersionsFunc := createMultiVersionFunc(versionCounts)
// Test first page with limit of 5 (should be truncated)
maxKeys := 5
res1, err := backend.WalkVersions(context.Background(), fsys, "", "", "", "", maxKeys, getVersionsFunc, []string{})
if err != nil {
t.Fatalf("walk versions first page: %v", err)
}
// Verify first page results
if !res1.Truncated {
t.Error("expected first page to be truncated")
}
if len(res1.ObjectVersions) != maxKeys {
t.Errorf("expected %d versions in first page, got %d", maxKeys, len(res1.ObjectVersions))
}
// Expected order: bar (3 versions), baz (2 versions) - lexicographic order
expectedFirstPage := []string{"bar", "bar", "bar", "baz", "baz"}
if len(res1.ObjectVersions) != len(expectedFirstPage) {
t.Fatalf("first page length mismatch: expected %d, got %d", len(expectedFirstPage), len(res1.ObjectVersions))
}
for i, expected := range expectedFirstPage {
if res1.ObjectVersions[i].Key == nil || *res1.ObjectVersions[i].Key != expected {
t.Errorf("first page[%d]: expected key %s, got %v", i, expected, res1.ObjectVersions[i].Key)
}
}
// Verify next markers are set
if res1.NextMarker == "" {
t.Error("expected NextMarker to be set on truncated result")
}
if res1.NextVersionIdMarker == "" {
t.Error("expected NextVersionIdMarker to be set on truncated result")
}
// Test second page using markers
res2, err := backend.WalkVersions(context.Background(), fsys, "", "", res1.NextMarker, res1.NextVersionIdMarker, maxKeys, getVersionsFunc, []string{})
if err != nil {
t.Fatalf("walk versions second page: %v", err)
}
t.Logf("Second page: ObjectVersions=%d, Truncated=%v, NextMarker=%s, NextVersionIdMarker=%s",
len(res2.ObjectVersions), res2.Truncated, res2.NextMarker, res2.NextVersionIdMarker)
for i, ov := range res2.ObjectVersions {
t.Logf(" [%d] Key=%s, VersionId=%s", i, *ov.Key, *ov.VersionId)
}
// Verify second page results
// With maxKeys=5, we should have 3 pages total: 5 + 5 + 2 = 12
// Test third page if needed
var res3 backend.WalkVersioningResults
if res2.Truncated {
res3, err = backend.WalkVersions(context.Background(), fsys, "", "", res2.NextMarker, res2.NextVersionIdMarker, maxKeys, getVersionsFunc, []string{})
if err != nil {
t.Fatalf("walk versions third page: %v", err)
}
t.Logf("Third page: ObjectVersions=%d, Truncated=%v, NextMarker=%s, NextVersionIdMarker=%s",
len(res3.ObjectVersions), res3.Truncated, res3.NextMarker, res3.NextVersionIdMarker)
for i, ov := range res3.ObjectVersions {
t.Logf(" [%d] Key=%s, VersionId=%s", i, *ov.Key, *ov.VersionId)
}
}
// Verify total count across all pages
totalVersions := len(res1.ObjectVersions) + len(res2.ObjectVersions) + len(res3.ObjectVersions)
expectedTotal := versionCounts["foo"] + versionCounts["bar"] + versionCounts["baz"]
if totalVersions != expectedTotal {
t.Errorf("total versions mismatch: expected %d, got %d", expectedTotal, totalVersions)
}
}

View File

@@ -23080,7 +23080,7 @@ func ListObjectVersions_multiple_object_versions_truncated(s *S3Conf) error {
if !compareVersions(versions[:maxKeys], out.Versions) {
return fmt.Errorf("expected the resulting object versions to be %v, instead got %v",
versions[:maxKeys], out.Versions)
sprintVersions(versions[:maxKeys]), sprintVersions(out.Versions))
}
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
@@ -23116,7 +23116,7 @@ func ListObjectVersions_multiple_object_versions_truncated(s *S3Conf) error {
if !compareVersions(versions[maxKeys:], out.Versions) {
return fmt.Errorf("expected the resulting object versions to be %v, instead got %v",
versions[maxKeys:], out.Versions)
sprintVersions(versions[:maxKeys]), sprintVersions(out.Versions))
}
return nil

View File

@@ -1839,3 +1839,16 @@ func sprintPrefixes(cpfx []types.CommonPrefix) string {
return strings.Join(names, ",")
}
func sprintVersions(objects []types.ObjectVersion) string {
if len(objects) == 0 {
return ""
}
names := make([]string, len(objects))
for i, obj := range objects {
names[i] = fmt.Sprintf("%v/%v", *obj.Key, obj.VersionId)
}
return strings.Join(names, ",")
}