Files
versitygw/backend/walk.go
Ben McClelland 34da18337e fix: lex sort order of listobjectversions backend.WalkVersions
Similar to:
  8e18b43116
  fix: lex sort order of listobjects backend.Walk
But now the "Versions" walk.

The original backend.WalkVersions function used the native WalkDir and ReadDir
which did not guarantee lexicographic ordering of results for cases where
including directory slash changes the sort order. This caused incorrect
paginated responses because S3 APIs require strict lexicographic ordering
where directories with trailing slashes sort correctly relative to files.
For example, dir1/a.b/ must come before dir1/a/ in the results, but
fs.WalkDir was returning them in filesystem sort order which reversed
the order due to not taking in account the trailing "/".
2025-09-12 11:49:58 -07:00

1006 lines
26 KiB
Go

// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package backend
import (
"context"
"errors"
"fmt"
"io/fs"
"slices"
"sort"
"strings"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/versity/versitygw/s3response"
)
type WalkResults struct {
CommonPrefixes []types.CommonPrefix
Objects []s3response.Object
Truncated bool
NextMarker string
}
type GetObjFunc func(path string, d fs.DirEntry) (s3response.Object, error)
var ErrSkipObj = errors.New("skip this object")
var (
pathSeparator = "/"
pathDot = "."
)
// fakeDirEntry implements fs.DirEntry for implied directories
type fakeDirEntry struct {
name string
isDir bool
}
func (f *fakeDirEntry) Name() string {
return f.name
}
func (f *fakeDirEntry) IsDir() bool {
return f.isDir
}
func (f *fakeDirEntry) Type() fs.FileMode {
if f.isDir {
return fs.ModeDir
}
return 0
}
func (f *fakeDirEntry) Info() (fs.FileInfo, error) {
return &fakeDirInfo{name: f.name, isDir: f.isDir}, nil
}
// fakeDirInfo implements fs.FileInfo for implied directories
type fakeDirInfo struct {
name string
isDir bool
}
func (f *fakeDirInfo) Name() string {
return f.name
}
func (f *fakeDirInfo) Size() int64 {
return 0
}
func (f *fakeDirInfo) Mode() fs.FileMode {
if f.isDir {
return fs.ModeDir
}
return 0
}
func (f *fakeDirInfo) ModTime() time.Time {
return time.Time{} // Return zero time for fake directories
}
func (f *fakeDirInfo) IsDir() bool {
return f.isDir
}
func (f *fakeDirInfo) Sys() any {
return nil
}
// map to store object common prefixes
type cpMap map[string]int
func (c cpMap) Add(key string) {
_, ok := c[key]
if !ok {
c[key] = len(c)
}
}
// Len returns the length of the map
func (c cpMap) Len() int {
return len(c)
}
// CpArray converts the map into a sorted []types.CommonPrefixes array
func (c cpMap) CpArray() []types.CommonPrefix {
commonPrefixes := make([]types.CommonPrefix, c.Len())
for cp, i := range c {
pfx := cp
commonPrefixes[i] = types.CommonPrefix{
Prefix: &pfx,
}
}
return commonPrefixes
}
// walkState holds the state needed during directory traversal
type walkState struct {
ctx context.Context
fileSystem fs.FS
prefix string
delimiter string
marker string
max int32
getObj GetObjFunc
skipdirs []string
// Mutable state
cpmap cpMap
objects []s3response.Object
pastMarker bool
pastMax bool
newMarker string
truncated bool
}
// entryWithSortKey represents a directory entry with its lexicographic sort key
type entryWithSortKey struct {
d fs.DirEntry
path string
sortKey string
}
// shouldSkipDirectory checks if a directory should be skipped based on prefix matching
func shouldSkipDirectory(dirPath, prefix string) bool {
if prefix == "" {
return false
}
return !strings.HasPrefix(dirPath, prefix) && !strings.HasPrefix(prefix, dirPath)
}
// constructPath builds the full path for a directory entry
func constructPath(dir, entryName string) string {
if dir == pathDot {
return entryName
}
return dir + pathSeparator + entryName
}
// createSortKey creates a lexicographic sort key for an entry
func createSortKey(path string, isDir bool) string {
if isDir {
return path + pathSeparator
}
return path
}
// buildSortedEntries reads directory entries and sorts them lexicographically
func buildSortedEntries(fileSystem fs.FS, dir string, skipdirs []string) ([]entryWithSortKey, error) {
entries, err := fs.ReadDir(fileSystem, dir)
if err != nil {
return nil, err
}
var entriesWithKeys []entryWithSortKey
for _, d := range entries {
if slices.Contains(skipdirs, d.Name()) {
continue
}
path := constructPath(dir, d.Name())
sortKey := createSortKey(path, d.IsDir())
entriesWithKeys = append(entriesWithKeys, entryWithSortKey{
d: d,
path: path,
sortKey: sortKey,
})
}
// Sort by the sort key to ensure proper lexicographic ordering
sort.Slice(entriesWithKeys, func(i, j int) bool {
return entriesWithKeys[i].sortKey < entriesWithKeys[j].sortKey
})
return entriesWithKeys, nil
}
// checkLimits returns true if we've hit our limits and should stop processing
func (w *walkState) checkLimits() bool {
return w.pastMax || w.ctx.Err() != nil
}
// addObject adds an object to the results and checks if limits are reached
func (w *walkState) addObject(obj s3response.Object, path string) bool {
w.objects = append(w.objects, obj)
if len(w.objects)+w.cpmap.Len() >= int(w.max) {
w.newMarker = path
w.pastMax = true
// Don't set truncated here - wait until we know there are more items
return true
}
return false
}
// addCommonPrefix adds a common prefix and checks if limits are reached
func (w *walkState) addCommonPrefix(cpref string) bool {
w.cpmap.Add(cpref)
if len(w.objects)+w.cpmap.Len() >= int(w.max) {
w.newMarker = cpref
w.pastMax = true
// Don't set truncated here - wait until we know there are more items
return true
}
return false
}
// isBeforeMarker checks if a path is before the current marker
func (w *walkState) isBeforeMarker(path string) bool {
return !w.pastMarker && path < w.marker
}
// isAtMarker checks if a path matches the current marker
func (w *walkState) isAtMarker(path string) bool {
return !w.pastMarker && path == w.marker
}
// Walk walks the supplied fs.FS and returns results compatible with list
// objects responses
func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker string, max int32, getObj GetObjFunc, skipdirs []string) (WalkResults, error) {
// Early return for zero max
if max == 0 {
return WalkResults{Truncated: false}, nil
}
state := &walkState{
ctx: ctx,
fileSystem: fileSystem,
prefix: prefix,
delimiter: delimiter,
marker: marker,
max: max,
getObj: getObj,
skipdirs: skipdirs,
cpmap: cpMap{},
pastMarker: marker == "",
}
root := pathDot
if strings.Contains(prefix, pathSeparator) {
if idx := strings.LastIndex(prefix, pathSeparator); idx > 0 {
root = prefix[:idx]
}
}
// Special case: if prefix ends with delimiter and refers to an empty single directory,
// we need to start from parent to find the directory as an entry
if delimiter != "" && prefix != "" && strings.HasSuffix(prefix, delimiter) {
prefixDir := strings.TrimSuffix(prefix, delimiter)
if !strings.Contains(prefixDir, delimiter) {
// Check if this directory exists and is empty
if entries, err := fs.ReadDir(fileSystem, prefixDir); err == nil && len(entries) == 0 {
// Single-level empty directory like "a" with prefix "a/"
root = pathDot
}
}
}
// Handle special case for prefix directories in non-delimiter mode
if delimiter == "" && prefix != "" && strings.HasSuffix(prefix, pathSeparator) {
err := state.handlePrefixDirectory(prefix)
if err != nil {
return WalkResults{}, err
}
}
if err := state.walkLexSort(root); err != nil {
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
return WalkResults{}, nil
}
return WalkResults{}, err
}
if !state.truncated {
state.newMarker = ""
}
return WalkResults{
CommonPrefixes: state.cpmap.CpArray(),
Objects: state.objects,
Truncated: state.truncated,
NextMarker: state.newMarker,
}, nil
}
// handlePrefixDirectory handles the special case where we need to include the root directory itself
func (w *walkState) handlePrefixDirectory(prefix string) error {
if w.pastMarker || w.marker == "" || prefix <= w.marker {
if w.marker == "" || prefix < w.marker || prefix == w.marker {
if prefix != w.marker {
prefixDir := strings.TrimSuffix(prefix, pathSeparator)
dirInfo := &fakeDirEntry{name: prefixDir, isDir: true}
dirobj, err := w.getObj(prefix, dirInfo)
switch err {
case ErrSkipObj:
// Skip this directory
case nil:
if w.addObject(dirobj, prefix) {
return nil
}
default:
return fmt.Errorf("prefix directory to object %q: %w",
prefix, err)
}
}
if prefix == w.marker {
w.pastMarker = true
}
}
}
return nil
}
// walkLexSort performs lexicographically ordered directory traversal
func (w *walkState) walkLexSort(dir string) error {
if w.ctx.Err() != nil {
return w.ctx.Err()
}
entries, err := buildSortedEntries(w.fileSystem, dir, w.skipdirs)
if err != nil {
return err
}
for i, entry := range entries {
if w.checkLimits() {
return w.ctx.Err()
}
if w.pastMax {
// We have more entries to process, so mark as truncated
w.truncated = true
return nil
}
if err := w.processEntry(entry); err != nil {
return err
}
// After processing an entry, check if we hit the limit
if w.pastMax {
// We hit the limit. Check if there are more entries after this one
if i+1 < len(entries) {
// There are more entries in this directory
w.truncated = true
} else {
// This was the last entry in this directory, but there might be more dirs to recurse into
// Let the higher level determine if we're truly done
}
return nil
}
}
return nil
}
// processEntry processes a single directory entry
func (w *walkState) processEntry(entry entryWithSortKey) error {
path := entry.path
d := entry.d
if d.IsDir() {
return w.processDirectory(path, d)
}
return w.processFile(path, d)
}
// processDirectory handles directory processing logic
func (w *walkState) processDirectory(path string, d fs.DirEntry) error {
dirPath := path + pathSeparator
// Check if we should skip this directory based on prefix
if shouldSkipDirectory(dirPath, w.prefix) {
return nil
}
// Handle marker logic for directories
if w.isBeforeMarker(dirPath) {
// Before marker - only recurse if marker could be inside
if strings.HasPrefix(w.marker, dirPath) {
return w.walkLexSort(path)
}
return nil
}
if w.isAtMarker(dirPath) {
// At marker - recurse but don't include as common prefix
w.pastMarker = true
return w.walkLexSort(path)
}
// Apply prefix filter
if w.prefix != "" && !strings.HasPrefix(dirPath, w.prefix) {
return w.walkLexSort(path)
}
if w.delimiter != "" {
return w.processDirectoryWithDelimiter(path, dirPath)
}
return w.processDirectoryWithoutDelimiter(path, dirPath, d)
}
// processDirectoryWithDelimiter handles directory processing when delimiter is specified
func (w *walkState) processDirectoryWithDelimiter(path, dirPath string) error {
// Special case: if the directory path exactly matches the prefix, return it as an object
if dirPath == w.prefix {
dirobj, err := w.getObj(dirPath, &fakeDirEntry{name: path, isDir: true})
switch err {
case ErrSkipObj:
// Skip but continue to recurse
case nil:
if w.addObject(dirobj, dirPath) {
return nil
}
default:
return fmt.Errorf("directory to object %q: %w", dirPath, err)
}
}
// Handle delimiter logic for directories
suffix := strings.TrimPrefix(dirPath, w.prefix)
before, _, found := strings.Cut(suffix, w.delimiter)
if found {
// Directory creates a common prefix
cprefNoDelim := w.prefix + before
cpref := w.prefix + before + w.delimiter
if w.isBeforeMarker(cpref) {
if w.marker != "" && strings.HasPrefix(w.marker, cprefNoDelim) {
return w.walkLexSort(path)
}
return nil
}
if w.isAtMarker(cpref) {
w.pastMarker = true
return w.walkLexSort(path)
}
// Skip if this common prefix is <= marker (for when pastMarker is already true)
if w.marker != "" && cpref <= w.marker {
return w.walkLexSort(path)
}
if w.pastMax {
w.truncated = true
return nil
}
if w.addCommonPrefix(cpref) {
return nil
}
}
return w.walkLexSort(path)
}
// processDirectoryWithoutDelimiter handles directory processing when no delimiter is specified
func (w *walkState) processDirectoryWithoutDelimiter(path, dirPath string, d fs.DirEntry) error {
// Include directory as object if it matches prefix
if w.prefix == "" || strings.HasPrefix(dirPath, w.prefix) {
dirobj, err := w.getObj(dirPath, d)
switch err {
case ErrSkipObj:
// Skip but continue to recurse
case nil:
if w.addObject(dirobj, dirPath) {
return nil
}
default:
return fmt.Errorf("directory to object %q: %w", dirPath, err)
}
}
return w.walkLexSort(path)
}
func (w *walkState) processFile(path string, d fs.DirEntry) error {
if w.isBeforeMarker(path) {
return nil
}
if w.isAtMarker(path) {
w.pastMarker = true
return nil
}
if w.prefix != "" && !strings.HasPrefix(path, w.prefix) {
return nil
}
if w.delimiter != "" {
return w.processFileWithDelimiter(path, d)
}
return w.processFileWithoutDelimiter(path, d)
}
// processFileWithDelimiter handles file processing when delimiter is specified
func (w *walkState) processFileWithDelimiter(path string, d fs.DirEntry) error {
suffix := strings.TrimPrefix(path, w.prefix)
before, _, found := strings.Cut(suffix, w.delimiter)
if !found {
// File doesn't contain delimiter after prefix - include it
obj, err := w.getObj(path, d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
if w.pastMax {
w.truncated = true
return nil
}
if w.addObject(obj, path) {
return nil
}
return nil
}
// File contains delimiter after prefix - add to common prefixes
cprefNoDelim := w.prefix + before
cpref := w.prefix + before + w.delimiter
if w.isBeforeMarker(cpref) {
if w.marker != "" && strings.HasPrefix(w.marker, cprefNoDelim) {
return nil
}
return nil
}
if w.isAtMarker(cpref) {
w.pastMarker = true
return nil
}
// Skip if this common prefix is <= marker (for when pastMarker is already true)
if w.marker != "" && cpref <= w.marker {
return nil
}
if w.pastMax {
w.truncated = true
return nil
}
w.addCommonPrefix(cpref)
return nil
}
// processFileWithoutDelimiter handles file processing when no delimiter is specified
func (w *walkState) processFileWithoutDelimiter(path string, d fs.DirEntry) error {
obj, err := w.getObj(path, d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
w.addObject(obj, path)
return nil
}
type WalkVersioningResults struct {
CommonPrefixes []types.CommonPrefix
ObjectVersions []s3response.ObjectVersion
DelMarkers []types.DeleteMarkerEntry
Truncated bool
NextMarker string
NextVersionIdMarker string
}
type ObjVersionFuncResult struct {
ObjectVersions []s3response.ObjectVersion
DelMarkers []types.DeleteMarkerEntry
NextVersionIdMarker string
Truncated bool
}
type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*ObjVersionFuncResult, error)
// WalkVersions walks the supplied fs.FS and returns results compatible with
// ListObjectVersions action response
func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getVersions GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) {
if max == 0 {
return WalkVersioningResults{}, nil
}
state := &versionWalkState{
ctx: ctx,
fileSystem: fileSystem,
prefix: prefix,
delimiter: delimiter,
keyMarker: keyMarker,
versionIdMarker: versionIdMarker,
max: max,
getVersions: getVersions,
skipdirs: skipdirs,
cpmap: cpMap{},
pastMarker: keyMarker == "",
pastVersionIdMarker: versionIdMarker == "",
}
// Determine traversal root similar to Walk
root := pathDot
if strings.Contains(prefix, pathSeparator) {
if idx := strings.LastIndex(prefix, pathSeparator); idx > 0 {
root = prefix[:idx]
}
}
// Handle special case for prefix directories in non-delimiter mode
if delimiter == "" && prefix != "" && strings.HasSuffix(prefix, pathSeparator) {
err := state.handleVersionPrefixDirectory(prefix)
if err != nil {
return WalkVersioningResults{}, err
}
}
if err := state.walkLexSortVersions(root); err != nil {
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
return WalkVersioningResults{}, nil
}
return WalkVersioningResults{}, err
}
if !state.truncated {
state.newMarker = ""
}
return WalkVersioningResults{
CommonPrefixes: state.cpmap.CpArray(),
ObjectVersions: state.objectVersions,
DelMarkers: state.delMarkers,
Truncated: state.truncated,
NextMarker: state.newMarker,
NextVersionIdMarker: state.newVersionIdMarker,
}, nil
}
// versionWalkState holds the state needed during version directory traversal
type versionWalkState struct {
ctx context.Context
fileSystem fs.FS
prefix string
delimiter string
keyMarker string
versionIdMarker string
max int
getVersions GetVersionsFunc
skipdirs []string
cpmap cpMap
objectVersions []s3response.ObjectVersion
delMarkers []types.DeleteMarkerEntry
pastMarker bool
pastVersionIdMarker bool
pastMax bool
newMarker string
newVersionIdMarker string
truncated bool
}
// handleVersionPrefixDirectory handles the special case where we need to include the prefix directory itself
func (v *versionWalkState) handleVersionPrefixDirectory(prefix string) error {
prefixDir := strings.TrimSuffix(prefix, pathSeparator)
name := prefixDir
if idx := strings.LastIndex(prefixDir, pathSeparator); idx >= 0 {
name = prefixDir[idx+1:]
}
if v.pastMarker || v.keyMarker == "" || prefixDir <= v.keyMarker {
if v.keyMarker == "" || prefixDir <= v.keyMarker { // equality also allowed to advance marker
if prefixDir != v.keyMarker { // avoid duplicate if marker points exactly here
err := v.addVersions(prefixDir, &fakeDirEntry{name: name, isDir: true})
if err != nil {
return err
}
}
if prefixDir == v.keyMarker {
v.pastMarker = true
}
}
}
return nil
}
// walkLexSortVersions performs lexicographically ordered traversal for versions
func (v *versionWalkState) walkLexSortVersions(dir string) error {
if v.ctx.Err() != nil {
return v.ctx.Err()
}
entries, err := buildSortedEntries(v.fileSystem, dir, v.skipdirs)
if err != nil {
return err
}
for i, entry := range entries {
if v.checkLimits() {
return v.ctx.Err()
}
if v.pastMax {
// We have more entries to process, so mark as truncated
v.truncated = true
return nil
}
if err := v.processVersionEntry(entry); err != nil {
return err
}
// After processing an entry, check if we hit the limit
if v.pastMax {
// We hit the limit. Check if there are more entries after this one
if i+1 < len(entries) {
// There are more entries in this directory
v.truncated = true
}
// This was the last entry in this directory, but there might be
// more dirs to recurse into
// Let the higher level determine if we're truly done
return nil
}
}
return nil
}
func (v *versionWalkState) processVersionEntry(entry entryWithSortKey) error {
path := entry.path
d := entry.d
if d.IsDir() {
return v.processVersionDirectory(path, d)
}
return v.processVersionFile(path, d)
}
// processVersionDirectory handles directory processing logic
func (v *versionWalkState) processVersionDirectory(path string, d fs.DirEntry) error {
dirPath := path + pathSeparator
// Check if we should skip this directory based on prefix
if shouldSkipDirectory(dirPath, v.prefix) {
return nil
}
// Handle marker logic for directories
if v.isBeforeMarker(path) { // path used (no trailing /) consistent with markers
// Before marker - only recurse if marker could be inside
if strings.HasPrefix(v.keyMarker, dirPath) {
return v.walkLexSortVersions(path)
}
return nil
}
if v.isAtMarker(path) {
// At marker - recurse but don't include as common prefix
v.pastMarker = true
return v.walkLexSortVersions(path)
}
// Apply prefix filter
if v.prefix != "" && !strings.HasPrefix(dirPath, v.prefix) {
return v.walkLexSortVersions(path)
}
if v.delimiter != "" {
return v.processVersionDirectoryWithDelimiter(path, dirPath, d)
}
return v.processVersionDirectoryWithoutDelimiter(path, dirPath, d)
}
// processVersionDirectoryWithDelimiter handles directory processing when delimiter is specified
func (v *versionWalkState) processVersionDirectoryWithDelimiter(path, dirPath string, d fs.DirEntry) error {
// Special case: if the directory path exactly matches the prefix, return it as an object
if dirPath == v.prefix {
if err := v.addVersions(path, d); err != nil {
return err
}
if v.pastMax {
return nil
}
}
// Handle delimiter logic for directories
suffix := strings.TrimPrefix(dirPath, v.prefix)
before, _, found := strings.Cut(suffix, v.delimiter)
if found {
// Directory creates a common prefix
cprefNoDelim := v.prefix + before
cpref := cprefNoDelim + v.delimiter
if v.handleCommonPrefix(cpref) {
return nil
}
if v.keyMarker != "" && strings.HasPrefix(v.keyMarker, cprefNoDelim) {
return v.walkLexSortVersions(path)
}
}
return v.walkLexSortVersions(path)
}
// processVersionDirectoryWithoutDelimiter handles directory processing when no delimiter is specified
func (v *versionWalkState) processVersionDirectoryWithoutDelimiter(path, dirPath string, d fs.DirEntry) error {
// Include directory as object if it matches prefix
if v.prefix == "" || strings.HasPrefix(dirPath, v.prefix) {
if err := v.addVersions(path, d); err != nil {
return err
}
if v.pastMax {
return nil
}
}
return v.walkLexSortVersions(path)
}
// processVersionFile handles file processing logic
func (v *versionWalkState) processVersionFile(path string, d fs.DirEntry) error {
if v.isBeforeMarker(path) {
return nil
}
if v.isAtMarker(path) {
v.pastMarker = true
// Don't return here - we need to process this object for remaining versions
}
if v.prefix != "" && !strings.HasPrefix(path, v.prefix) {
return nil
}
if v.delimiter != "" {
return v.processVersionFileWithDelimiter(path, d)
}
return v.processVersionFileWithoutDelimiter(path, d)
}
// processVersionFileWithDelimiter handles file processing when delimiter is specified
func (v *versionWalkState) processVersionFileWithDelimiter(path string, d fs.DirEntry) error {
suffix := strings.TrimPrefix(path, v.prefix)
before, _, found := strings.Cut(suffix, v.delimiter)
if !found {
// File doesn't contain delimiter after prefix - include it
if err := v.addVersions(path, d); err != nil {
return err
}
if v.pastMax {
return nil
}
return nil
}
// File contains delimiter after prefix - add to common prefixes
cprefNoDelim := v.prefix + before
cpref := cprefNoDelim + v.delimiter
if v.handleCommonPrefix(cpref) {
return nil
}
return nil
}
// processVersionFileWithoutDelimiter handles file processing when no delimiter is specified
func (v *versionWalkState) processVersionFileWithoutDelimiter(path string, d fs.DirEntry) error {
if err := v.addVersions(path, d); err != nil {
return err
}
return nil
}
// available returns remaining capacity (objects + delete markers + prefixes)
// before reaching the max listing size.
func (v *versionWalkState) available() int {
return v.max - (len(v.objectVersions) + len(v.delMarkers) + v.cpmap.Len())
}
// checkLimits returns true if we've passed the max or context cancelled.
func (v *versionWalkState) checkLimits() bool {
return v.pastMax || v.ctx.Err() != nil
}
// isBeforeMarker determines if path sorts before current key marker.
func (v *versionWalkState) isBeforeMarker(path string) bool {
return !v.pastMarker && path < v.keyMarker
}
// isAtMarker determines if path equals the current key marker.
func (v *versionWalkState) isAtMarker(path string) bool {
return !v.pastMarker && path == v.keyMarker
}
// addVersions fetches version information for a path and updates state.
func (v *versionWalkState) addVersions(path string, d fs.DirEntry) error {
if v.available() <= 0 {
v.pastMax = true
return nil
}
res, err := v.getVersions(path, v.versionIdMarker, &v.pastVersionIdMarker, v.available(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("versions for object %q: %w", path, err)
}
v.objectVersions = append(v.objectVersions, res.ObjectVersions...)
v.delMarkers = append(v.delMarkers, res.DelMarkers...)
if res.Truncated {
v.truncated = true
v.newMarker = path
v.newVersionIdMarker = res.NextVersionIdMarker
v.pastMax = true
return nil
}
if v.available() <= 0 {
v.newMarker = path
v.pastMax = true
}
return nil
}
// addCommonPrefix records a common prefix if capacity allows.
func (v *versionWalkState) addCommonPrefix(cpref string) {
if v.pastMax {
return
}
v.cpmap.Add(cpref)
if v.available() <= 0 {
v.newMarker = cpref
v.pastMax = true
}
}
// handleCommonPrefix processes common prefix logic for version walking
func (v *versionWalkState) handleCommonPrefix(cpref string) bool {
if v.isBeforeMarker(cpref) {
return false // Don't process this prefix
}
if v.isAtMarker(cpref) {
v.pastMarker = true
return false // Don't include as common prefix but continue processing
}
// Skip if this common prefix is <= marker (for when pastMarker is already true)
if v.keyMarker != "" && cpref <= v.keyMarker {
return false
}
if v.pastMax {
v.truncated = true
return true // Stop processing
}
v.addCommonPrefix(cpref)
return v.pastMax // Return true if we should stop processing
}