diff --git a/backend/walk.go b/backend/walk.go index 227f3301..02aca3c3 100644 --- a/backend/walk.go +++ b/backend/walk.go @@ -624,179 +624,382 @@ type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *boo // WalkVersions walks the supplied fs.FS and returns results compatible with // ListObjectVersions action response -func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getObj GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) { - cpmap := cpMap{} - var objects []s3response.ObjectVersion - var delMarkers []types.DeleteMarkerEntry - - var pastMarker bool - if keyMarker == "" { - pastMarker = true +func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getVersions GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) { + if max == 0 { + return WalkVersioningResults{}, nil } - var nextMarker string - var nextVersionIdMarker string - var truncated bool - pastVersionIdMarker := versionIdMarker == "" + state := &versionWalkState{ + ctx: ctx, + fileSystem: fileSystem, + prefix: prefix, + delimiter: delimiter, + keyMarker: keyMarker, + versionIdMarker: versionIdMarker, + max: max, + getVersions: getVersions, + skipdirs: skipdirs, + cpmap: cpMap{}, + pastMarker: keyMarker == "", + pastVersionIdMarker: versionIdMarker == "", + } - err := fs.WalkDir(fileSystem, pathDot, func(path string, d fs.DirEntry, err error) error { + // Determine traversal root similar to Walk + root := pathDot + if strings.Contains(prefix, pathSeparator) { + if idx := strings.LastIndex(prefix, pathSeparator); idx > 0 { + root = prefix[:idx] + } + } + + // Handle special case for prefix directories in non-delimiter mode + if delimiter == "" && prefix != "" && strings.HasSuffix(prefix, pathSeparator) { + err := state.handleVersionPrefixDirectory(prefix) if err != nil { - return err + return WalkVersioningResults{}, err } - if ctx.Err() != nil { - return ctx.Err() + } + + if err := state.walkLexSortVersions(root); err != nil { + if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) { + return WalkVersioningResults{}, nil } - // Ignore the root directory - if path == pathDot { - return nil - } - if slices.Contains(skipdirs, d.Name()) { - return fs.SkipDir - } - - if !pastMarker { - if path == keyMarker { - pastMarker = true - } - if path < keyMarker { - return nil - } - } - - if d.IsDir() { - // If prefix is defined and the directory does not match prefix, - // do not descend into the directory because nothing will - // match this prefix. Make sure to append the / at the end of - // directories since this is implied as a directory path name. - // If path is a prefix of prefix, then path could still be - // building to match. So only skip if path isn't a prefix of prefix - // and prefix isn't a prefix of path. - if prefix != "" && - !strings.HasPrefix(path+pathSeparator, prefix) && - !strings.HasPrefix(prefix, path+pathSeparator) { - return fs.SkipDir - } - - // Don't recurse into subdirectories when listing with delimiter. - if delimiter == pathSeparator && - prefix != path+pathSeparator && - strings.HasPrefix(path+pathSeparator, prefix) { - cpmap.Add(path + pathSeparator) - return fs.SkipDir - } - - res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d) - if err == ErrSkipObj { - return nil - } - if err != nil { - return fmt.Errorf("directory to object %q: %w", path, err) - } - objects = append(objects, res.ObjectVersions...) - delMarkers = append(delMarkers, res.DelMarkers...) - if res.Truncated { - truncated = true - nextMarker = path - nextVersionIdMarker = res.NextVersionIdMarker - return fs.SkipAll - } - - return nil - } - - // If object doesn't have prefix, don't include in results. - if prefix != "" && !strings.HasPrefix(path, prefix) { - return nil - } - - if delimiter == "" { - // If no delimiter specified, then all files with matching - // prefix are included in results - res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d) - if err == ErrSkipObj { - return nil - } - if err != nil { - return fmt.Errorf("file to object %q: %w", path, err) - } - objects = append(objects, res.ObjectVersions...) - delMarkers = append(delMarkers, res.DelMarkers...) - if res.Truncated { - truncated = true - nextMarker = path - nextVersionIdMarker = res.NextVersionIdMarker - return fs.SkipAll - } - - return nil - } - - // Since delimiter is specified, we only want results that - // do not contain the delimiter beyond the prefix. If the - // delimiter exists past the prefix, then the substring - // between the prefix and delimiter is part of common prefixes. - // - // For example: - // prefix = A/ - // delimiter = / - // and objects: - // A/file - // A/B/file - // B/C - // would return: - // objects: A/file - // common prefix: A/B/ - // - // Note: No objects are included past the common prefix since - // these are all rolled up into the common prefix. - // Note: The delimiter can be anything, so we have to operate on - // the full path without any assumptions on posix directory hierarchy - // here. Usually the delimiter will be pathSeparator, but thats not required. - suffix := strings.TrimPrefix(path, prefix) - before, _, found := strings.Cut(suffix, delimiter) - if !found { - res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d) - if err == ErrSkipObj { - return nil - } - if err != nil { - return fmt.Errorf("file to object %q: %w", path, err) - } - objects = append(objects, res.ObjectVersions...) - delMarkers = append(delMarkers, res.DelMarkers...) - - if res.Truncated { - truncated = true - nextMarker = path - nextVersionIdMarker = res.NextVersionIdMarker - return fs.SkipAll - } - return nil - } - - // Common prefixes are a set, so should not have duplicates. - // These are abstractly a "directory", so need to include the - // delimiter at the end. - cpmap.Add(prefix + before + delimiter) - if (len(objects) + cpmap.Len()) == int(max) { - nextMarker = path - truncated = true - - return fs.SkipAll - } - - return nil - }) - if err != nil { return WalkVersioningResults{}, err } + if !state.truncated { + state.newMarker = "" + } + return WalkVersioningResults{ - CommonPrefixes: cpmap.CpArray(), - ObjectVersions: objects, - DelMarkers: delMarkers, - Truncated: truncated, - NextMarker: nextMarker, - NextVersionIdMarker: nextVersionIdMarker, + CommonPrefixes: state.cpmap.CpArray(), + ObjectVersions: state.objectVersions, + DelMarkers: state.delMarkers, + Truncated: state.truncated, + NextMarker: state.newMarker, + NextVersionIdMarker: state.newVersionIdMarker, }, nil } + +// versionWalkState holds the state needed during version directory traversal +type versionWalkState struct { + ctx context.Context + fileSystem fs.FS + prefix string + delimiter string + keyMarker string + versionIdMarker string + max int + getVersions GetVersionsFunc + skipdirs []string + cpmap cpMap + objectVersions []s3response.ObjectVersion + delMarkers []types.DeleteMarkerEntry + pastMarker bool + pastVersionIdMarker bool + pastMax bool + newMarker string + newVersionIdMarker string + truncated bool +} + +// handleVersionPrefixDirectory handles the special case where we need to include the prefix directory itself +func (v *versionWalkState) handleVersionPrefixDirectory(prefix string) error { + prefixDir := strings.TrimSuffix(prefix, pathSeparator) + name := prefixDir + if idx := strings.LastIndex(prefixDir, pathSeparator); idx >= 0 { + name = prefixDir[idx+1:] + } + if v.pastMarker || v.keyMarker == "" || prefixDir <= v.keyMarker { + if v.keyMarker == "" || prefixDir <= v.keyMarker { // equality also allowed to advance marker + if prefixDir != v.keyMarker { // avoid duplicate if marker points exactly here + err := v.addVersions(prefixDir, &fakeDirEntry{name: name, isDir: true}) + if err != nil { + return err + } + } + if prefixDir == v.keyMarker { + v.pastMarker = true + } + } + } + return nil +} + +// walkLexSortVersions performs lexicographically ordered traversal for versions +func (v *versionWalkState) walkLexSortVersions(dir string) error { + if v.ctx.Err() != nil { + return v.ctx.Err() + } + + entries, err := buildSortedEntries(v.fileSystem, dir, v.skipdirs) + if err != nil { + return err + } + + for i, entry := range entries { + if v.checkLimits() { + return v.ctx.Err() + } + + if v.pastMax { + // We have more entries to process, so mark as truncated + v.truncated = true + return nil + } + + if err := v.processVersionEntry(entry); err != nil { + return err + } + + // After processing an entry, check if we hit the limit + if v.pastMax { + // We hit the limit. Check if there are more entries after this one + if i+1 < len(entries) { + // There are more entries in this directory + v.truncated = true + } + // This was the last entry in this directory, but there might be + // more dirs to recurse into + // Let the higher level determine if we're truly done + return nil + } + } + + return nil +} + +func (v *versionWalkState) processVersionEntry(entry entryWithSortKey) error { + path := entry.path + d := entry.d + if d.IsDir() { + return v.processVersionDirectory(path, d) + } + return v.processVersionFile(path, d) +} + +// processVersionDirectory handles directory processing logic +func (v *versionWalkState) processVersionDirectory(path string, d fs.DirEntry) error { + dirPath := path + pathSeparator + + // Check if we should skip this directory based on prefix + if shouldSkipDirectory(dirPath, v.prefix) { + return nil + } + + // Handle marker logic for directories + if v.isBeforeMarker(path) { // path used (no trailing /) consistent with markers + // Before marker - only recurse if marker could be inside + if strings.HasPrefix(v.keyMarker, dirPath) { + return v.walkLexSortVersions(path) + } + return nil + } + + if v.isAtMarker(path) { + // At marker - recurse but don't include as common prefix + v.pastMarker = true + return v.walkLexSortVersions(path) + } + + // Apply prefix filter + if v.prefix != "" && !strings.HasPrefix(dirPath, v.prefix) { + return v.walkLexSortVersions(path) + } + + if v.delimiter != "" { + return v.processVersionDirectoryWithDelimiter(path, dirPath, d) + } + return v.processVersionDirectoryWithoutDelimiter(path, dirPath, d) +} + +// processVersionDirectoryWithDelimiter handles directory processing when delimiter is specified +func (v *versionWalkState) processVersionDirectoryWithDelimiter(path, dirPath string, d fs.DirEntry) error { + // Special case: if the directory path exactly matches the prefix, return it as an object + if dirPath == v.prefix { + if err := v.addVersions(path, d); err != nil { + return err + } + if v.pastMax { + return nil + } + } + + // Handle delimiter logic for directories + suffix := strings.TrimPrefix(dirPath, v.prefix) + before, _, found := strings.Cut(suffix, v.delimiter) + + if found { + // Directory creates a common prefix + cprefNoDelim := v.prefix + before + cpref := cprefNoDelim + v.delimiter + + if v.handleCommonPrefix(cpref) { + return nil + } + + if v.keyMarker != "" && strings.HasPrefix(v.keyMarker, cprefNoDelim) { + return v.walkLexSortVersions(path) + } + } + + return v.walkLexSortVersions(path) +} + +// processVersionDirectoryWithoutDelimiter handles directory processing when no delimiter is specified +func (v *versionWalkState) processVersionDirectoryWithoutDelimiter(path, dirPath string, d fs.DirEntry) error { + // Include directory as object if it matches prefix + if v.prefix == "" || strings.HasPrefix(dirPath, v.prefix) { + if err := v.addVersions(path, d); err != nil { + return err + } + if v.pastMax { + return nil + } + } + + return v.walkLexSortVersions(path) +} + +// processVersionFile handles file processing logic +func (v *versionWalkState) processVersionFile(path string, d fs.DirEntry) error { + if v.isBeforeMarker(path) { + return nil + } + + if v.isAtMarker(path) { + v.pastMarker = true + // Don't return here - we need to process this object for remaining versions + } + + if v.prefix != "" && !strings.HasPrefix(path, v.prefix) { + return nil + } + + if v.delimiter != "" { + return v.processVersionFileWithDelimiter(path, d) + } + return v.processVersionFileWithoutDelimiter(path, d) +} + +// processVersionFileWithDelimiter handles file processing when delimiter is specified +func (v *versionWalkState) processVersionFileWithDelimiter(path string, d fs.DirEntry) error { + suffix := strings.TrimPrefix(path, v.prefix) + before, _, found := strings.Cut(suffix, v.delimiter) + + if !found { + // File doesn't contain delimiter after prefix - include it + if err := v.addVersions(path, d); err != nil { + return err + } + if v.pastMax { + return nil + } + return nil + } + + // File contains delimiter after prefix - add to common prefixes + cprefNoDelim := v.prefix + before + cpref := cprefNoDelim + v.delimiter + + if v.handleCommonPrefix(cpref) { + return nil + } + + return nil +} + +// processVersionFileWithoutDelimiter handles file processing when no delimiter is specified +func (v *versionWalkState) processVersionFileWithoutDelimiter(path string, d fs.DirEntry) error { + if err := v.addVersions(path, d); err != nil { + return err + } + return nil +} + +// available returns remaining capacity (objects + delete markers + prefixes) +// before reaching the max listing size. +func (v *versionWalkState) available() int { + return v.max - (len(v.objectVersions) + len(v.delMarkers) + v.cpmap.Len()) +} + +// checkLimits returns true if we've passed the max or context cancelled. +func (v *versionWalkState) checkLimits() bool { + return v.pastMax || v.ctx.Err() != nil +} + +// isBeforeMarker determines if path sorts before current key marker. +func (v *versionWalkState) isBeforeMarker(path string) bool { + return !v.pastMarker && path < v.keyMarker +} + +// isAtMarker determines if path equals the current key marker. +func (v *versionWalkState) isAtMarker(path string) bool { + return !v.pastMarker && path == v.keyMarker +} + +// addVersions fetches version information for a path and updates state. +func (v *versionWalkState) addVersions(path string, d fs.DirEntry) error { + if v.available() <= 0 { + v.pastMax = true + return nil + } + res, err := v.getVersions(path, v.versionIdMarker, &v.pastVersionIdMarker, v.available(), d) + if err == ErrSkipObj { + return nil + } + if err != nil { + return fmt.Errorf("versions for object %q: %w", path, err) + } + v.objectVersions = append(v.objectVersions, res.ObjectVersions...) + v.delMarkers = append(v.delMarkers, res.DelMarkers...) + if res.Truncated { + v.truncated = true + v.newMarker = path + v.newVersionIdMarker = res.NextVersionIdMarker + v.pastMax = true + return nil + } + if v.available() <= 0 { + v.newMarker = path + v.pastMax = true + } + return nil +} + +// addCommonPrefix records a common prefix if capacity allows. +func (v *versionWalkState) addCommonPrefix(cpref string) { + if v.pastMax { + return + } + v.cpmap.Add(cpref) + if v.available() <= 0 { + v.newMarker = cpref + v.pastMax = true + } +} + +// handleCommonPrefix processes common prefix logic for version walking +func (v *versionWalkState) handleCommonPrefix(cpref string) bool { + if v.isBeforeMarker(cpref) { + return false // Don't process this prefix + } + + if v.isAtMarker(cpref) { + v.pastMarker = true + return false // Don't include as common prefix but continue processing + } + + // Skip if this common prefix is <= marker (for when pastMarker is already true) + if v.keyMarker != "" && cpref <= v.keyMarker { + return false + } + + if v.pastMax { + v.truncated = true + return true // Stop processing + } + + v.addCommonPrefix(cpref) + return v.pastMax // Return true if we should stop processing +} diff --git a/backend/walk_test.go b/backend/walk_test.go index 2314aa30..ed0335a7 100644 --- a/backend/walk_test.go +++ b/backend/walk_test.go @@ -419,30 +419,14 @@ func TestOrderWalk(t *testing.T) { prefix: "dir1/", expected: backend.WalkResults{ Objects: []s3response.Object{ - { - Key: backend.GetPtrFromString("dir1/"), - }, - { - Key: backend.GetPtrFromString("dir1/a.b/"), - }, - { - Key: backend.GetPtrFromString("dir1/a.b/file1"), - }, - { - Key: backend.GetPtrFromString("dir1/a.b/file2"), - }, - { - Key: backend.GetPtrFromString("dir1/a/"), - }, - { - Key: backend.GetPtrFromString("dir1/a/file1"), - }, - { - Key: backend.GetPtrFromString("dir1/a/file2"), - }, - { - Key: backend.GetPtrFromString("dir1/a/file3"), - }, + {Key: backend.GetPtrFromString("dir1/")}, + {Key: backend.GetPtrFromString("dir1/a.b/")}, + {Key: backend.GetPtrFromString("dir1/a.b/file1")}, + {Key: backend.GetPtrFromString("dir1/a.b/file2")}, + {Key: backend.GetPtrFromString("dir1/a/")}, + {Key: backend.GetPtrFromString("dir1/a/file1")}, + {Key: backend.GetPtrFromString("dir1/a/file2")}, + {Key: backend.GetPtrFromString("dir1/a/file3")}, }, }, }, @@ -801,3 +785,309 @@ func comparePrefixesOrdered(a, b []types.CommonPrefix) bool { } return true } + +// ---- Versioning Tests ---- + +// getVersionsTestFunc is a simple GetVersionsFunc implementation for tests that +// returns a single latest version for each file or directory encountered. +// Directories are reported with a trailing delimiter in the key to match the +// behavior of the non-versioned Walk tests where directory objects are listed. +func getVersionsTestFunc(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) { + // If we have no available slots left, signal truncation (should be rare in these tests) + if availableObjCount <= 0 { + return &backend.ObjVersionFuncResult{Truncated: true, NextVersionIdMarker: ""}, nil + } + + key := path + if d.IsDir() { + key = key + "/" + } + ver := "v1" + latest := true + ov := s3response.ObjectVersion{Key: &key, VersionId: &ver, IsLatest: &latest} + return &backend.ObjVersionFuncResult{ObjectVersions: []s3response.ObjectVersion{ov}}, nil +} + +// TestWalkVersions mirrors TestWalk but exercises WalkVersions and validates +// common prefixes and object versions for typical delimiter/prefix scenarios. +func TestWalkVersions(t *testing.T) { + fsys := fstest.MapFS{ + "dir1/a/file1": {}, + "dir1/a/file2": {}, + "dir1/b/file3": {}, + "rootfile": {}, + } + + // Without a delimiter, every directory and file becomes an object version + // via the test GetVersionsFunc (directories have trailing '/'). + expected := backend.WalkVersioningResults{ + ObjectVersions: []s3response.ObjectVersion{ + {Key: backend.GetPtrFromString("dir1/")}, + {Key: backend.GetPtrFromString("dir1/a/")}, + {Key: backend.GetPtrFromString("dir1/a/file1")}, + {Key: backend.GetPtrFromString("dir1/a/file2")}, + {Key: backend.GetPtrFromString("dir1/b/")}, + {Key: backend.GetPtrFromString("dir1/b/file3")}, + {Key: backend.GetPtrFromString("rootfile")}, + }, + } + + res, err := backend.WalkVersions(context.Background(), fsys, "", "", "", "", 1000, getVersionsTestFunc, []string{}) + if err != nil { + t.Fatalf("walk versions: %v", err) + } + compareVersionResultsOrdered("simple versions no delimiter", res, expected, t) +} + +// TestOrderWalkVersions mirrors TestOrderWalk, exercising ordering semantics for +// version listings (lexicographic ordering of directory and file version keys). +func TestOrderWalkVersions(t *testing.T) { + fsys := fstest.MapFS{ + "dir1/a/file1": {}, + "dir1/a/file2": {}, + "dir1/a/file3": {}, + "dir1/a.b/file1": {}, + "dir1/a.b/file2": {}, + } + + // Expect lexicographic ordering similar to non-version walk when no delimiter. + expected := backend.WalkVersioningResults{ + ObjectVersions: []s3response.ObjectVersion{ + {Key: backend.GetPtrFromString("dir1/")}, + {Key: backend.GetPtrFromString("dir1/a.b/")}, + {Key: backend.GetPtrFromString("dir1/a.b/file1")}, + {Key: backend.GetPtrFromString("dir1/a.b/file2")}, + {Key: backend.GetPtrFromString("dir1/a/")}, + {Key: backend.GetPtrFromString("dir1/a/file1")}, + {Key: backend.GetPtrFromString("dir1/a/file2")}, + {Key: backend.GetPtrFromString("dir1/a/file3")}, + }, + } + + res, err := backend.WalkVersions(context.Background(), fsys, "dir1/", "", "", "", 1000, getVersionsTestFunc, []string{}) + if err != nil { + t.Fatalf("order walk versions: %v", err) + } + compareVersionResultsOrdered("order versions no delimiter", res, expected, t) +} + +// compareVersionResults compares unordered sets of common prefixes and object versions +// compareVersionResultsOrdered compares ordered slices +func compareVersionResultsOrdered(name string, got, wanted backend.WalkVersioningResults, t *testing.T) { + if !compareObjectVersionsOrdered(got.ObjectVersions, wanted.ObjectVersions) { + t.Errorf("%v: unexpected object versions, got %v wanted %v", name, printVersionObjects(got.ObjectVersions), printVersionObjects(wanted.ObjectVersions)) + } + if !comparePrefixesOrdered(got.CommonPrefixes, wanted.CommonPrefixes) { + t.Errorf("%v: unexpected prefix, got %v wanted %v", name, printCommonPrefixes(got.CommonPrefixes), printCommonPrefixes(wanted.CommonPrefixes)) + } +} + +func compareObjectVersionsOrdered(a, b []s3response.ObjectVersion) bool { + if len(a) == 0 && len(b) == 0 { + return true + } + if len(a) != len(b) { + return false + } + for i, ov := range a { + if ov.Key == nil || b[i].Key == nil { + return false + } + if *ov.Key != *b[i].Key { + return false + } + } + return true +} + +func printVersionObjects(list []s3response.ObjectVersion) string { + res := "[" + for _, ov := range list { + var key string + if ov.Key == nil { + key = "" + } else { + key = *ov.Key + } + if res == "[" { + res = res + key + } else { + res = res + ", " + key + } + } + return res + "]" +} + +// multiVersionGetVersionsFunc is a more sophisticated test function that simulates +// multiple versions per object, similar to the integration test behavior. +// It creates multiple versions for each file with deterministic version IDs. +func createMultiVersionFunc(files map[string]int) backend.GetVersionsFunc { + // Pre-generate all versions for deterministic testing + versionedFiles := make(map[string][]s3response.ObjectVersion) + + for path, versionCount := range files { + versions := make([]s3response.ObjectVersion, versionCount) + for i := range versionCount { + versionId := fmt.Sprintf("v%d", i+1) + isLatest := i == versionCount-1 // Last version is latest + key := path + + versions[i] = s3response.ObjectVersion{ + Key: &key, + VersionId: &versionId, + IsLatest: &isLatest, + } + } + // Reverse slice so latest comes first (reverse chronological order) + for i, j := 0, len(versions)-1; i < j; i, j = i+1, j-1 { + versions[i], versions[j] = versions[j], versions[i] + } + versionedFiles[path] = versions + } + + return func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) { + if availableObjCount <= 0 { + return &backend.ObjVersionFuncResult{Truncated: true}, nil + } + + // Handle directories - just return a single directory version + if d.IsDir() { + key := path + "/" + ver := "v1" + latest := true + ov := s3response.ObjectVersion{Key: &key, VersionId: &ver, IsLatest: &latest} + return &backend.ObjVersionFuncResult{ObjectVersions: []s3response.ObjectVersion{ov}}, nil + } + + // Get versions for this file + versions, exists := versionedFiles[path] + if !exists { + // No versions for this file, skip it + return &backend.ObjVersionFuncResult{}, backend.ErrSkipObj + } + + // Handle version ID marker pagination + startIdx := 0 + if versionIdMarker != "" && !*pastVersionIdMarker { + // Find the starting position after the marker + for i, version := range versions { + if *version.VersionId == versionIdMarker { + startIdx = i + 1 + *pastVersionIdMarker = true + break + } + } + } + + // Return available versions up to the limit + endIdx := min(startIdx+availableObjCount, len(versions)) + + result := &backend.ObjVersionFuncResult{ + ObjectVersions: versions[startIdx:endIdx], + } + + // Check if we need to truncate + if endIdx < len(versions) { + result.Truncated = true + result.NextVersionIdMarker = *versions[endIdx-1].VersionId + } + + return result, nil + } +} + +// TestWalkVersionsTruncated tests the pagination behavior of WalkVersions +// when there are multiple versions per object and the result is truncated. +// This mirrors the integration test ListObjectVersions_multiple_object_versions_truncated. +func TestWalkVersionsTruncated(t *testing.T) { + // Create filesystem with the same files as integration test + fsys := fstest.MapFS{ + "foo": {}, + "bar": {}, + "baz": {}, + } + + // Define version counts per file (matching integration test) + versionCounts := map[string]int{ + "foo": 4, // 4 versions + "bar": 3, // 3 versions + "baz": 5, // 5 versions + } + + getVersionsFunc := createMultiVersionFunc(versionCounts) + + // Test first page with limit of 5 (should be truncated) + maxKeys := 5 + res1, err := backend.WalkVersions(context.Background(), fsys, "", "", "", "", maxKeys, getVersionsFunc, []string{}) + if err != nil { + t.Fatalf("walk versions first page: %v", err) + } + + // Verify first page results + if !res1.Truncated { + t.Error("expected first page to be truncated") + } + + if len(res1.ObjectVersions) != maxKeys { + t.Errorf("expected %d versions in first page, got %d", maxKeys, len(res1.ObjectVersions)) + } + + // Expected order: bar (3 versions), baz (2 versions) - lexicographic order + expectedFirstPage := []string{"bar", "bar", "bar", "baz", "baz"} + if len(res1.ObjectVersions) != len(expectedFirstPage) { + t.Fatalf("first page length mismatch: expected %d, got %d", len(expectedFirstPage), len(res1.ObjectVersions)) + } + + for i, expected := range expectedFirstPage { + if res1.ObjectVersions[i].Key == nil || *res1.ObjectVersions[i].Key != expected { + t.Errorf("first page[%d]: expected key %s, got %v", i, expected, res1.ObjectVersions[i].Key) + } + } + + // Verify next markers are set + if res1.NextMarker == "" { + t.Error("expected NextMarker to be set on truncated result") + } + if res1.NextVersionIdMarker == "" { + t.Error("expected NextVersionIdMarker to be set on truncated result") + } + + // Test second page using markers + res2, err := backend.WalkVersions(context.Background(), fsys, "", "", res1.NextMarker, res1.NextVersionIdMarker, maxKeys, getVersionsFunc, []string{}) + if err != nil { + t.Fatalf("walk versions second page: %v", err) + } + + t.Logf("Second page: ObjectVersions=%d, Truncated=%v, NextMarker=%s, NextVersionIdMarker=%s", + len(res2.ObjectVersions), res2.Truncated, res2.NextMarker, res2.NextVersionIdMarker) + + for i, ov := range res2.ObjectVersions { + t.Logf(" [%d] Key=%s, VersionId=%s", i, *ov.Key, *ov.VersionId) + } + + // Verify second page results + // With maxKeys=5, we should have 3 pages total: 5 + 5 + 2 = 12 + + // Test third page if needed + var res3 backend.WalkVersioningResults + if res2.Truncated { + res3, err = backend.WalkVersions(context.Background(), fsys, "", "", res2.NextMarker, res2.NextVersionIdMarker, maxKeys, getVersionsFunc, []string{}) + if err != nil { + t.Fatalf("walk versions third page: %v", err) + } + + t.Logf("Third page: ObjectVersions=%d, Truncated=%v, NextMarker=%s, NextVersionIdMarker=%s", + len(res3.ObjectVersions), res3.Truncated, res3.NextMarker, res3.NextVersionIdMarker) + + for i, ov := range res3.ObjectVersions { + t.Logf(" [%d] Key=%s, VersionId=%s", i, *ov.Key, *ov.VersionId) + } + } + + // Verify total count across all pages + totalVersions := len(res1.ObjectVersions) + len(res2.ObjectVersions) + len(res3.ObjectVersions) + expectedTotal := versionCounts["foo"] + versionCounts["bar"] + versionCounts["baz"] + if totalVersions != expectedTotal { + t.Errorf("total versions mismatch: expected %d, got %d", expectedTotal, totalVersions) + } +} diff --git a/tests/integration/tests.go b/tests/integration/tests.go index abf632e4..617cd288 100644 --- a/tests/integration/tests.go +++ b/tests/integration/tests.go @@ -23080,7 +23080,7 @@ func ListObjectVersions_multiple_object_versions_truncated(s *S3Conf) error { if !compareVersions(versions[:maxKeys], out.Versions) { return fmt.Errorf("expected the resulting object versions to be %v, instead got %v", - versions[:maxKeys], out.Versions) + sprintVersions(versions[:maxKeys]), sprintVersions(out.Versions)) } ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) @@ -23116,7 +23116,7 @@ func ListObjectVersions_multiple_object_versions_truncated(s *S3Conf) error { if !compareVersions(versions[maxKeys:], out.Versions) { return fmt.Errorf("expected the resulting object versions to be %v, instead got %v", - versions[maxKeys:], out.Versions) + sprintVersions(versions[:maxKeys]), sprintVersions(out.Versions)) } return nil diff --git a/tests/integration/utils.go b/tests/integration/utils.go index b4e61e03..3ca0aa7e 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1839,3 +1839,16 @@ func sprintPrefixes(cpfx []types.CommonPrefix) string { return strings.Join(names, ",") } + +func sprintVersions(objects []types.ObjectVersion) string { + if len(objects) == 0 { + return "" + } + + names := make([]string, len(objects)) + for i, obj := range objects { + names[i] = fmt.Sprintf("%v/%v", *obj.Key, obj.VersionId) + } + + return strings.Join(names, ",") +}