mirror of
https://github.com/versity/versitygw.git
synced 2025-12-23 05:05:16 +00:00
Similar to:
8e18b43116
fix: lex sort order of listobjects backend.Walk
But now the "Versions" walk.
The original backend.WalkVersions function used the native WalkDir and ReadDir
which did not guarantee lexicographic ordering of results for cases where
including directory slash changes the sort order. This caused incorrect
paginated responses because S3 APIs require strict lexicographic ordering
where directories with trailing slashes sort correctly relative to files.
For example, dir1/a.b/ must come before dir1/a/ in the results, but
fs.WalkDir was returning them in filesystem sort order which reversed
the order due to not taking in account the trailing "/".
1006 lines
26 KiB
Go
1006 lines
26 KiB
Go
// Copyright 2023 Versity Software
|
|
// This file is licensed under the Apache License, Version 2.0
|
|
// (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
package backend
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"slices"
|
|
"sort"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/versity/versitygw/s3response"
|
|
)
|
|
|
|
type WalkResults struct {
|
|
CommonPrefixes []types.CommonPrefix
|
|
Objects []s3response.Object
|
|
Truncated bool
|
|
NextMarker string
|
|
}
|
|
|
|
type GetObjFunc func(path string, d fs.DirEntry) (s3response.Object, error)
|
|
|
|
var ErrSkipObj = errors.New("skip this object")
|
|
|
|
var (
|
|
pathSeparator = "/"
|
|
pathDot = "."
|
|
)
|
|
|
|
// fakeDirEntry implements fs.DirEntry for implied directories
|
|
type fakeDirEntry struct {
|
|
name string
|
|
isDir bool
|
|
}
|
|
|
|
func (f *fakeDirEntry) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
func (f *fakeDirEntry) IsDir() bool {
|
|
return f.isDir
|
|
}
|
|
|
|
func (f *fakeDirEntry) Type() fs.FileMode {
|
|
if f.isDir {
|
|
return fs.ModeDir
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (f *fakeDirEntry) Info() (fs.FileInfo, error) {
|
|
return &fakeDirInfo{name: f.name, isDir: f.isDir}, nil
|
|
}
|
|
|
|
// fakeDirInfo implements fs.FileInfo for implied directories
|
|
type fakeDirInfo struct {
|
|
name string
|
|
isDir bool
|
|
}
|
|
|
|
func (f *fakeDirInfo) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
func (f *fakeDirInfo) Size() int64 {
|
|
return 0
|
|
}
|
|
|
|
func (f *fakeDirInfo) Mode() fs.FileMode {
|
|
if f.isDir {
|
|
return fs.ModeDir
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (f *fakeDirInfo) ModTime() time.Time {
|
|
return time.Time{} // Return zero time for fake directories
|
|
}
|
|
|
|
func (f *fakeDirInfo) IsDir() bool {
|
|
return f.isDir
|
|
}
|
|
|
|
func (f *fakeDirInfo) Sys() any {
|
|
return nil
|
|
}
|
|
|
|
// map to store object common prefixes
|
|
type cpMap map[string]int
|
|
|
|
func (c cpMap) Add(key string) {
|
|
_, ok := c[key]
|
|
if !ok {
|
|
c[key] = len(c)
|
|
}
|
|
}
|
|
|
|
// Len returns the length of the map
|
|
func (c cpMap) Len() int {
|
|
return len(c)
|
|
}
|
|
|
|
// CpArray converts the map into a sorted []types.CommonPrefixes array
|
|
func (c cpMap) CpArray() []types.CommonPrefix {
|
|
commonPrefixes := make([]types.CommonPrefix, c.Len())
|
|
for cp, i := range c {
|
|
pfx := cp
|
|
commonPrefixes[i] = types.CommonPrefix{
|
|
Prefix: &pfx,
|
|
}
|
|
}
|
|
|
|
return commonPrefixes
|
|
}
|
|
|
|
// walkState holds the state needed during directory traversal
|
|
type walkState struct {
|
|
ctx context.Context
|
|
fileSystem fs.FS
|
|
prefix string
|
|
delimiter string
|
|
marker string
|
|
max int32
|
|
getObj GetObjFunc
|
|
skipdirs []string
|
|
|
|
// Mutable state
|
|
cpmap cpMap
|
|
objects []s3response.Object
|
|
pastMarker bool
|
|
pastMax bool
|
|
newMarker string
|
|
truncated bool
|
|
}
|
|
|
|
// entryWithSortKey represents a directory entry with its lexicographic sort key
|
|
type entryWithSortKey struct {
|
|
d fs.DirEntry
|
|
path string
|
|
sortKey string
|
|
}
|
|
|
|
// shouldSkipDirectory checks if a directory should be skipped based on prefix matching
|
|
func shouldSkipDirectory(dirPath, prefix string) bool {
|
|
if prefix == "" {
|
|
return false
|
|
}
|
|
return !strings.HasPrefix(dirPath, prefix) && !strings.HasPrefix(prefix, dirPath)
|
|
}
|
|
|
|
// constructPath builds the full path for a directory entry
|
|
func constructPath(dir, entryName string) string {
|
|
if dir == pathDot {
|
|
return entryName
|
|
}
|
|
return dir + pathSeparator + entryName
|
|
}
|
|
|
|
// createSortKey creates a lexicographic sort key for an entry
|
|
func createSortKey(path string, isDir bool) string {
|
|
if isDir {
|
|
return path + pathSeparator
|
|
}
|
|
return path
|
|
}
|
|
|
|
// buildSortedEntries reads directory entries and sorts them lexicographically
|
|
func buildSortedEntries(fileSystem fs.FS, dir string, skipdirs []string) ([]entryWithSortKey, error) {
|
|
entries, err := fs.ReadDir(fileSystem, dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var entriesWithKeys []entryWithSortKey
|
|
for _, d := range entries {
|
|
if slices.Contains(skipdirs, d.Name()) {
|
|
continue
|
|
}
|
|
|
|
path := constructPath(dir, d.Name())
|
|
sortKey := createSortKey(path, d.IsDir())
|
|
|
|
entriesWithKeys = append(entriesWithKeys, entryWithSortKey{
|
|
d: d,
|
|
path: path,
|
|
sortKey: sortKey,
|
|
})
|
|
}
|
|
|
|
// Sort by the sort key to ensure proper lexicographic ordering
|
|
sort.Slice(entriesWithKeys, func(i, j int) bool {
|
|
return entriesWithKeys[i].sortKey < entriesWithKeys[j].sortKey
|
|
})
|
|
|
|
return entriesWithKeys, nil
|
|
}
|
|
|
|
// checkLimits returns true if we've hit our limits and should stop processing
|
|
func (w *walkState) checkLimits() bool {
|
|
return w.pastMax || w.ctx.Err() != nil
|
|
}
|
|
|
|
// addObject adds an object to the results and checks if limits are reached
|
|
func (w *walkState) addObject(obj s3response.Object, path string) bool {
|
|
w.objects = append(w.objects, obj)
|
|
if len(w.objects)+w.cpmap.Len() >= int(w.max) {
|
|
w.newMarker = path
|
|
w.pastMax = true
|
|
// Don't set truncated here - wait until we know there are more items
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// addCommonPrefix adds a common prefix and checks if limits are reached
|
|
func (w *walkState) addCommonPrefix(cpref string) bool {
|
|
w.cpmap.Add(cpref)
|
|
if len(w.objects)+w.cpmap.Len() >= int(w.max) {
|
|
w.newMarker = cpref
|
|
w.pastMax = true
|
|
// Don't set truncated here - wait until we know there are more items
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isBeforeMarker checks if a path is before the current marker
|
|
func (w *walkState) isBeforeMarker(path string) bool {
|
|
return !w.pastMarker && path < w.marker
|
|
}
|
|
|
|
// isAtMarker checks if a path matches the current marker
|
|
func (w *walkState) isAtMarker(path string) bool {
|
|
return !w.pastMarker && path == w.marker
|
|
}
|
|
|
|
// Walk walks the supplied fs.FS and returns results compatible with list
|
|
// objects responses
|
|
func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker string, max int32, getObj GetObjFunc, skipdirs []string) (WalkResults, error) {
|
|
// Early return for zero max
|
|
if max == 0 {
|
|
return WalkResults{Truncated: false}, nil
|
|
}
|
|
|
|
state := &walkState{
|
|
ctx: ctx,
|
|
fileSystem: fileSystem,
|
|
prefix: prefix,
|
|
delimiter: delimiter,
|
|
marker: marker,
|
|
max: max,
|
|
getObj: getObj,
|
|
skipdirs: skipdirs,
|
|
cpmap: cpMap{},
|
|
pastMarker: marker == "",
|
|
}
|
|
|
|
root := pathDot
|
|
if strings.Contains(prefix, pathSeparator) {
|
|
if idx := strings.LastIndex(prefix, pathSeparator); idx > 0 {
|
|
root = prefix[:idx]
|
|
}
|
|
}
|
|
|
|
// Special case: if prefix ends with delimiter and refers to an empty single directory,
|
|
// we need to start from parent to find the directory as an entry
|
|
if delimiter != "" && prefix != "" && strings.HasSuffix(prefix, delimiter) {
|
|
prefixDir := strings.TrimSuffix(prefix, delimiter)
|
|
if !strings.Contains(prefixDir, delimiter) {
|
|
// Check if this directory exists and is empty
|
|
if entries, err := fs.ReadDir(fileSystem, prefixDir); err == nil && len(entries) == 0 {
|
|
// Single-level empty directory like "a" with prefix "a/"
|
|
root = pathDot
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle special case for prefix directories in non-delimiter mode
|
|
if delimiter == "" && prefix != "" && strings.HasSuffix(prefix, pathSeparator) {
|
|
err := state.handlePrefixDirectory(prefix)
|
|
if err != nil {
|
|
return WalkResults{}, err
|
|
}
|
|
}
|
|
|
|
if err := state.walkLexSort(root); err != nil {
|
|
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
|
|
return WalkResults{}, nil
|
|
}
|
|
return WalkResults{}, err
|
|
}
|
|
|
|
if !state.truncated {
|
|
state.newMarker = ""
|
|
}
|
|
|
|
return WalkResults{
|
|
CommonPrefixes: state.cpmap.CpArray(),
|
|
Objects: state.objects,
|
|
Truncated: state.truncated,
|
|
NextMarker: state.newMarker,
|
|
}, nil
|
|
}
|
|
|
|
// handlePrefixDirectory handles the special case where we need to include the root directory itself
|
|
func (w *walkState) handlePrefixDirectory(prefix string) error {
|
|
if w.pastMarker || w.marker == "" || prefix <= w.marker {
|
|
if w.marker == "" || prefix < w.marker || prefix == w.marker {
|
|
if prefix != w.marker {
|
|
prefixDir := strings.TrimSuffix(prefix, pathSeparator)
|
|
dirInfo := &fakeDirEntry{name: prefixDir, isDir: true}
|
|
dirobj, err := w.getObj(prefix, dirInfo)
|
|
switch err {
|
|
case ErrSkipObj:
|
|
// Skip this directory
|
|
case nil:
|
|
if w.addObject(dirobj, prefix) {
|
|
return nil
|
|
}
|
|
default:
|
|
return fmt.Errorf("prefix directory to object %q: %w",
|
|
prefix, err)
|
|
}
|
|
}
|
|
if prefix == w.marker {
|
|
w.pastMarker = true
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// walkLexSort performs lexicographically ordered directory traversal
|
|
func (w *walkState) walkLexSort(dir string) error {
|
|
if w.ctx.Err() != nil {
|
|
return w.ctx.Err()
|
|
}
|
|
|
|
entries, err := buildSortedEntries(w.fileSystem, dir, w.skipdirs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i, entry := range entries {
|
|
if w.checkLimits() {
|
|
return w.ctx.Err()
|
|
}
|
|
|
|
if w.pastMax {
|
|
// We have more entries to process, so mark as truncated
|
|
w.truncated = true
|
|
return nil
|
|
}
|
|
|
|
if err := w.processEntry(entry); err != nil {
|
|
return err
|
|
}
|
|
|
|
// After processing an entry, check if we hit the limit
|
|
if w.pastMax {
|
|
// We hit the limit. Check if there are more entries after this one
|
|
if i+1 < len(entries) {
|
|
// There are more entries in this directory
|
|
w.truncated = true
|
|
} else {
|
|
// This was the last entry in this directory, but there might be more dirs to recurse into
|
|
// Let the higher level determine if we're truly done
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processEntry processes a single directory entry
|
|
func (w *walkState) processEntry(entry entryWithSortKey) error {
|
|
path := entry.path
|
|
d := entry.d
|
|
|
|
if d.IsDir() {
|
|
return w.processDirectory(path, d)
|
|
}
|
|
return w.processFile(path, d)
|
|
}
|
|
|
|
// processDirectory handles directory processing logic
|
|
func (w *walkState) processDirectory(path string, d fs.DirEntry) error {
|
|
dirPath := path + pathSeparator
|
|
|
|
// Check if we should skip this directory based on prefix
|
|
if shouldSkipDirectory(dirPath, w.prefix) {
|
|
return nil
|
|
}
|
|
|
|
// Handle marker logic for directories
|
|
if w.isBeforeMarker(dirPath) {
|
|
// Before marker - only recurse if marker could be inside
|
|
if strings.HasPrefix(w.marker, dirPath) {
|
|
return w.walkLexSort(path)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if w.isAtMarker(dirPath) {
|
|
// At marker - recurse but don't include as common prefix
|
|
w.pastMarker = true
|
|
return w.walkLexSort(path)
|
|
}
|
|
|
|
// Apply prefix filter
|
|
if w.prefix != "" && !strings.HasPrefix(dirPath, w.prefix) {
|
|
return w.walkLexSort(path)
|
|
}
|
|
|
|
if w.delimiter != "" {
|
|
return w.processDirectoryWithDelimiter(path, dirPath)
|
|
}
|
|
return w.processDirectoryWithoutDelimiter(path, dirPath, d)
|
|
}
|
|
|
|
// processDirectoryWithDelimiter handles directory processing when delimiter is specified
|
|
func (w *walkState) processDirectoryWithDelimiter(path, dirPath string) error {
|
|
// Special case: if the directory path exactly matches the prefix, return it as an object
|
|
if dirPath == w.prefix {
|
|
dirobj, err := w.getObj(dirPath, &fakeDirEntry{name: path, isDir: true})
|
|
switch err {
|
|
case ErrSkipObj:
|
|
// Skip but continue to recurse
|
|
case nil:
|
|
if w.addObject(dirobj, dirPath) {
|
|
return nil
|
|
}
|
|
default:
|
|
return fmt.Errorf("directory to object %q: %w", dirPath, err)
|
|
}
|
|
}
|
|
|
|
// Handle delimiter logic for directories
|
|
suffix := strings.TrimPrefix(dirPath, w.prefix)
|
|
before, _, found := strings.Cut(suffix, w.delimiter)
|
|
|
|
if found {
|
|
// Directory creates a common prefix
|
|
cprefNoDelim := w.prefix + before
|
|
cpref := w.prefix + before + w.delimiter
|
|
|
|
if w.isBeforeMarker(cpref) {
|
|
if w.marker != "" && strings.HasPrefix(w.marker, cprefNoDelim) {
|
|
return w.walkLexSort(path)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if w.isAtMarker(cpref) {
|
|
w.pastMarker = true
|
|
return w.walkLexSort(path)
|
|
}
|
|
|
|
// Skip if this common prefix is <= marker (for when pastMarker is already true)
|
|
if w.marker != "" && cpref <= w.marker {
|
|
return w.walkLexSort(path)
|
|
}
|
|
|
|
if w.pastMax {
|
|
w.truncated = true
|
|
return nil
|
|
}
|
|
|
|
if w.addCommonPrefix(cpref) {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return w.walkLexSort(path)
|
|
}
|
|
|
|
// processDirectoryWithoutDelimiter handles directory processing when no delimiter is specified
|
|
func (w *walkState) processDirectoryWithoutDelimiter(path, dirPath string, d fs.DirEntry) error {
|
|
// Include directory as object if it matches prefix
|
|
if w.prefix == "" || strings.HasPrefix(dirPath, w.prefix) {
|
|
dirobj, err := w.getObj(dirPath, d)
|
|
switch err {
|
|
case ErrSkipObj:
|
|
// Skip but continue to recurse
|
|
case nil:
|
|
if w.addObject(dirobj, dirPath) {
|
|
return nil
|
|
}
|
|
default:
|
|
return fmt.Errorf("directory to object %q: %w", dirPath, err)
|
|
}
|
|
}
|
|
|
|
return w.walkLexSort(path)
|
|
}
|
|
|
|
func (w *walkState) processFile(path string, d fs.DirEntry) error {
|
|
if w.isBeforeMarker(path) {
|
|
return nil
|
|
}
|
|
|
|
if w.isAtMarker(path) {
|
|
w.pastMarker = true
|
|
return nil
|
|
}
|
|
|
|
if w.prefix != "" && !strings.HasPrefix(path, w.prefix) {
|
|
return nil
|
|
}
|
|
|
|
if w.delimiter != "" {
|
|
return w.processFileWithDelimiter(path, d)
|
|
}
|
|
return w.processFileWithoutDelimiter(path, d)
|
|
}
|
|
|
|
// processFileWithDelimiter handles file processing when delimiter is specified
|
|
func (w *walkState) processFileWithDelimiter(path string, d fs.DirEntry) error {
|
|
suffix := strings.TrimPrefix(path, w.prefix)
|
|
before, _, found := strings.Cut(suffix, w.delimiter)
|
|
|
|
if !found {
|
|
// File doesn't contain delimiter after prefix - include it
|
|
obj, err := w.getObj(path, d)
|
|
if err == ErrSkipObj {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("file to object %q: %w", path, err)
|
|
}
|
|
if w.pastMax {
|
|
w.truncated = true
|
|
return nil
|
|
}
|
|
|
|
if w.addObject(obj, path) {
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// File contains delimiter after prefix - add to common prefixes
|
|
cprefNoDelim := w.prefix + before
|
|
cpref := w.prefix + before + w.delimiter
|
|
|
|
if w.isBeforeMarker(cpref) {
|
|
if w.marker != "" && strings.HasPrefix(w.marker, cprefNoDelim) {
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if w.isAtMarker(cpref) {
|
|
w.pastMarker = true
|
|
return nil
|
|
}
|
|
|
|
// Skip if this common prefix is <= marker (for when pastMarker is already true)
|
|
if w.marker != "" && cpref <= w.marker {
|
|
return nil
|
|
}
|
|
|
|
if w.pastMax {
|
|
w.truncated = true
|
|
return nil
|
|
}
|
|
|
|
w.addCommonPrefix(cpref)
|
|
|
|
return nil
|
|
}
|
|
|
|
// processFileWithoutDelimiter handles file processing when no delimiter is specified
|
|
func (w *walkState) processFileWithoutDelimiter(path string, d fs.DirEntry) error {
|
|
obj, err := w.getObj(path, d)
|
|
if err == ErrSkipObj {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("file to object %q: %w", path, err)
|
|
}
|
|
|
|
w.addObject(obj, path)
|
|
return nil
|
|
}
|
|
|
|
type WalkVersioningResults struct {
|
|
CommonPrefixes []types.CommonPrefix
|
|
ObjectVersions []s3response.ObjectVersion
|
|
DelMarkers []types.DeleteMarkerEntry
|
|
Truncated bool
|
|
NextMarker string
|
|
NextVersionIdMarker string
|
|
}
|
|
|
|
type ObjVersionFuncResult struct {
|
|
ObjectVersions []s3response.ObjectVersion
|
|
DelMarkers []types.DeleteMarkerEntry
|
|
NextVersionIdMarker string
|
|
Truncated bool
|
|
}
|
|
|
|
type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*ObjVersionFuncResult, error)
|
|
|
|
// WalkVersions walks the supplied fs.FS and returns results compatible with
|
|
// ListObjectVersions action response
|
|
func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getVersions GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) {
|
|
if max == 0 {
|
|
return WalkVersioningResults{}, nil
|
|
}
|
|
|
|
state := &versionWalkState{
|
|
ctx: ctx,
|
|
fileSystem: fileSystem,
|
|
prefix: prefix,
|
|
delimiter: delimiter,
|
|
keyMarker: keyMarker,
|
|
versionIdMarker: versionIdMarker,
|
|
max: max,
|
|
getVersions: getVersions,
|
|
skipdirs: skipdirs,
|
|
cpmap: cpMap{},
|
|
pastMarker: keyMarker == "",
|
|
pastVersionIdMarker: versionIdMarker == "",
|
|
}
|
|
|
|
// Determine traversal root similar to Walk
|
|
root := pathDot
|
|
if strings.Contains(prefix, pathSeparator) {
|
|
if idx := strings.LastIndex(prefix, pathSeparator); idx > 0 {
|
|
root = prefix[:idx]
|
|
}
|
|
}
|
|
|
|
// Handle special case for prefix directories in non-delimiter mode
|
|
if delimiter == "" && prefix != "" && strings.HasSuffix(prefix, pathSeparator) {
|
|
err := state.handleVersionPrefixDirectory(prefix)
|
|
if err != nil {
|
|
return WalkVersioningResults{}, err
|
|
}
|
|
}
|
|
|
|
if err := state.walkLexSortVersions(root); err != nil {
|
|
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
|
|
return WalkVersioningResults{}, nil
|
|
}
|
|
return WalkVersioningResults{}, err
|
|
}
|
|
|
|
if !state.truncated {
|
|
state.newMarker = ""
|
|
}
|
|
|
|
return WalkVersioningResults{
|
|
CommonPrefixes: state.cpmap.CpArray(),
|
|
ObjectVersions: state.objectVersions,
|
|
DelMarkers: state.delMarkers,
|
|
Truncated: state.truncated,
|
|
NextMarker: state.newMarker,
|
|
NextVersionIdMarker: state.newVersionIdMarker,
|
|
}, nil
|
|
}
|
|
|
|
// versionWalkState holds the state needed during version directory traversal
|
|
type versionWalkState struct {
|
|
ctx context.Context
|
|
fileSystem fs.FS
|
|
prefix string
|
|
delimiter string
|
|
keyMarker string
|
|
versionIdMarker string
|
|
max int
|
|
getVersions GetVersionsFunc
|
|
skipdirs []string
|
|
cpmap cpMap
|
|
objectVersions []s3response.ObjectVersion
|
|
delMarkers []types.DeleteMarkerEntry
|
|
pastMarker bool
|
|
pastVersionIdMarker bool
|
|
pastMax bool
|
|
newMarker string
|
|
newVersionIdMarker string
|
|
truncated bool
|
|
}
|
|
|
|
// handleVersionPrefixDirectory handles the special case where we need to include the prefix directory itself
|
|
func (v *versionWalkState) handleVersionPrefixDirectory(prefix string) error {
|
|
prefixDir := strings.TrimSuffix(prefix, pathSeparator)
|
|
name := prefixDir
|
|
if idx := strings.LastIndex(prefixDir, pathSeparator); idx >= 0 {
|
|
name = prefixDir[idx+1:]
|
|
}
|
|
if v.pastMarker || v.keyMarker == "" || prefixDir <= v.keyMarker {
|
|
if v.keyMarker == "" || prefixDir <= v.keyMarker { // equality also allowed to advance marker
|
|
if prefixDir != v.keyMarker { // avoid duplicate if marker points exactly here
|
|
err := v.addVersions(prefixDir, &fakeDirEntry{name: name, isDir: true})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if prefixDir == v.keyMarker {
|
|
v.pastMarker = true
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// walkLexSortVersions performs lexicographically ordered traversal for versions
|
|
func (v *versionWalkState) walkLexSortVersions(dir string) error {
|
|
if v.ctx.Err() != nil {
|
|
return v.ctx.Err()
|
|
}
|
|
|
|
entries, err := buildSortedEntries(v.fileSystem, dir, v.skipdirs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i, entry := range entries {
|
|
if v.checkLimits() {
|
|
return v.ctx.Err()
|
|
}
|
|
|
|
if v.pastMax {
|
|
// We have more entries to process, so mark as truncated
|
|
v.truncated = true
|
|
return nil
|
|
}
|
|
|
|
if err := v.processVersionEntry(entry); err != nil {
|
|
return err
|
|
}
|
|
|
|
// After processing an entry, check if we hit the limit
|
|
if v.pastMax {
|
|
// We hit the limit. Check if there are more entries after this one
|
|
if i+1 < len(entries) {
|
|
// There are more entries in this directory
|
|
v.truncated = true
|
|
}
|
|
// This was the last entry in this directory, but there might be
|
|
// more dirs to recurse into
|
|
// Let the higher level determine if we're truly done
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (v *versionWalkState) processVersionEntry(entry entryWithSortKey) error {
|
|
path := entry.path
|
|
d := entry.d
|
|
if d.IsDir() {
|
|
return v.processVersionDirectory(path, d)
|
|
}
|
|
return v.processVersionFile(path, d)
|
|
}
|
|
|
|
// processVersionDirectory handles directory processing logic
|
|
func (v *versionWalkState) processVersionDirectory(path string, d fs.DirEntry) error {
|
|
dirPath := path + pathSeparator
|
|
|
|
// Check if we should skip this directory based on prefix
|
|
if shouldSkipDirectory(dirPath, v.prefix) {
|
|
return nil
|
|
}
|
|
|
|
// Handle marker logic for directories
|
|
if v.isBeforeMarker(path) { // path used (no trailing /) consistent with markers
|
|
// Before marker - only recurse if marker could be inside
|
|
if strings.HasPrefix(v.keyMarker, dirPath) {
|
|
return v.walkLexSortVersions(path)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if v.isAtMarker(path) {
|
|
// At marker - recurse but don't include as common prefix
|
|
v.pastMarker = true
|
|
return v.walkLexSortVersions(path)
|
|
}
|
|
|
|
// Apply prefix filter
|
|
if v.prefix != "" && !strings.HasPrefix(dirPath, v.prefix) {
|
|
return v.walkLexSortVersions(path)
|
|
}
|
|
|
|
if v.delimiter != "" {
|
|
return v.processVersionDirectoryWithDelimiter(path, dirPath, d)
|
|
}
|
|
return v.processVersionDirectoryWithoutDelimiter(path, dirPath, d)
|
|
}
|
|
|
|
// processVersionDirectoryWithDelimiter handles directory processing when delimiter is specified
|
|
func (v *versionWalkState) processVersionDirectoryWithDelimiter(path, dirPath string, d fs.DirEntry) error {
|
|
// Special case: if the directory path exactly matches the prefix, return it as an object
|
|
if dirPath == v.prefix {
|
|
if err := v.addVersions(path, d); err != nil {
|
|
return err
|
|
}
|
|
if v.pastMax {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Handle delimiter logic for directories
|
|
suffix := strings.TrimPrefix(dirPath, v.prefix)
|
|
before, _, found := strings.Cut(suffix, v.delimiter)
|
|
|
|
if found {
|
|
// Directory creates a common prefix
|
|
cprefNoDelim := v.prefix + before
|
|
cpref := cprefNoDelim + v.delimiter
|
|
|
|
if v.handleCommonPrefix(cpref) {
|
|
return nil
|
|
}
|
|
|
|
if v.keyMarker != "" && strings.HasPrefix(v.keyMarker, cprefNoDelim) {
|
|
return v.walkLexSortVersions(path)
|
|
}
|
|
}
|
|
|
|
return v.walkLexSortVersions(path)
|
|
}
|
|
|
|
// processVersionDirectoryWithoutDelimiter handles directory processing when no delimiter is specified
|
|
func (v *versionWalkState) processVersionDirectoryWithoutDelimiter(path, dirPath string, d fs.DirEntry) error {
|
|
// Include directory as object if it matches prefix
|
|
if v.prefix == "" || strings.HasPrefix(dirPath, v.prefix) {
|
|
if err := v.addVersions(path, d); err != nil {
|
|
return err
|
|
}
|
|
if v.pastMax {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return v.walkLexSortVersions(path)
|
|
}
|
|
|
|
// processVersionFile handles file processing logic
|
|
func (v *versionWalkState) processVersionFile(path string, d fs.DirEntry) error {
|
|
if v.isBeforeMarker(path) {
|
|
return nil
|
|
}
|
|
|
|
if v.isAtMarker(path) {
|
|
v.pastMarker = true
|
|
// Don't return here - we need to process this object for remaining versions
|
|
}
|
|
|
|
if v.prefix != "" && !strings.HasPrefix(path, v.prefix) {
|
|
return nil
|
|
}
|
|
|
|
if v.delimiter != "" {
|
|
return v.processVersionFileWithDelimiter(path, d)
|
|
}
|
|
return v.processVersionFileWithoutDelimiter(path, d)
|
|
}
|
|
|
|
// processVersionFileWithDelimiter handles file processing when delimiter is specified
|
|
func (v *versionWalkState) processVersionFileWithDelimiter(path string, d fs.DirEntry) error {
|
|
suffix := strings.TrimPrefix(path, v.prefix)
|
|
before, _, found := strings.Cut(suffix, v.delimiter)
|
|
|
|
if !found {
|
|
// File doesn't contain delimiter after prefix - include it
|
|
if err := v.addVersions(path, d); err != nil {
|
|
return err
|
|
}
|
|
if v.pastMax {
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// File contains delimiter after prefix - add to common prefixes
|
|
cprefNoDelim := v.prefix + before
|
|
cpref := cprefNoDelim + v.delimiter
|
|
|
|
if v.handleCommonPrefix(cpref) {
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processVersionFileWithoutDelimiter handles file processing when no delimiter is specified
|
|
func (v *versionWalkState) processVersionFileWithoutDelimiter(path string, d fs.DirEntry) error {
|
|
if err := v.addVersions(path, d); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// available returns remaining capacity (objects + delete markers + prefixes)
|
|
// before reaching the max listing size.
|
|
func (v *versionWalkState) available() int {
|
|
return v.max - (len(v.objectVersions) + len(v.delMarkers) + v.cpmap.Len())
|
|
}
|
|
|
|
// checkLimits returns true if we've passed the max or context cancelled.
|
|
func (v *versionWalkState) checkLimits() bool {
|
|
return v.pastMax || v.ctx.Err() != nil
|
|
}
|
|
|
|
// isBeforeMarker determines if path sorts before current key marker.
|
|
func (v *versionWalkState) isBeforeMarker(path string) bool {
|
|
return !v.pastMarker && path < v.keyMarker
|
|
}
|
|
|
|
// isAtMarker determines if path equals the current key marker.
|
|
func (v *versionWalkState) isAtMarker(path string) bool {
|
|
return !v.pastMarker && path == v.keyMarker
|
|
}
|
|
|
|
// addVersions fetches version information for a path and updates state.
|
|
func (v *versionWalkState) addVersions(path string, d fs.DirEntry) error {
|
|
if v.available() <= 0 {
|
|
v.pastMax = true
|
|
return nil
|
|
}
|
|
res, err := v.getVersions(path, v.versionIdMarker, &v.pastVersionIdMarker, v.available(), d)
|
|
if err == ErrSkipObj {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("versions for object %q: %w", path, err)
|
|
}
|
|
v.objectVersions = append(v.objectVersions, res.ObjectVersions...)
|
|
v.delMarkers = append(v.delMarkers, res.DelMarkers...)
|
|
if res.Truncated {
|
|
v.truncated = true
|
|
v.newMarker = path
|
|
v.newVersionIdMarker = res.NextVersionIdMarker
|
|
v.pastMax = true
|
|
return nil
|
|
}
|
|
if v.available() <= 0 {
|
|
v.newMarker = path
|
|
v.pastMax = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// addCommonPrefix records a common prefix if capacity allows.
|
|
func (v *versionWalkState) addCommonPrefix(cpref string) {
|
|
if v.pastMax {
|
|
return
|
|
}
|
|
v.cpmap.Add(cpref)
|
|
if v.available() <= 0 {
|
|
v.newMarker = cpref
|
|
v.pastMax = true
|
|
}
|
|
}
|
|
|
|
// handleCommonPrefix processes common prefix logic for version walking
|
|
func (v *versionWalkState) handleCommonPrefix(cpref string) bool {
|
|
if v.isBeforeMarker(cpref) {
|
|
return false // Don't process this prefix
|
|
}
|
|
|
|
if v.isAtMarker(cpref) {
|
|
v.pastMarker = true
|
|
return false // Don't include as common prefix but continue processing
|
|
}
|
|
|
|
// Skip if this common prefix is <= marker (for when pastMarker is already true)
|
|
if v.keyMarker != "" && cpref <= v.keyMarker {
|
|
return false
|
|
}
|
|
|
|
if v.pastMax {
|
|
v.truncated = true
|
|
return true // Stop processing
|
|
}
|
|
|
|
v.addCommonPrefix(cpref)
|
|
return v.pastMax // Return true if we should stop processing
|
|
}
|