Files
versitygw/backend/walk.go
tonyipm 33917ad6f3 feat: implement quicker backend/posix walk algorithm
Rewrite the posix Walk implementation to avoid the extra ReadDir per
directory that was noted as a TODO in the old code.  The new algorithm
holds all traversal state in a walkState struct and uses processDir to
interleave sibling entries in correct S3 lexicographic order without a
second syscall.

Key changes:
prefix optimisation: jump directly into the deepest matching directory
rather than scanning from the root on every call

marker short-circuit: skip entire subtrees that are lexically before
the marker, making paginated listing faster

Co-authored-by: Ben McClelland <ben.mcclelland@versity.com>
2026-03-18 17:39:16 -07:00

653 lines
18 KiB
Go

// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package backend
import (
"context"
"errors"
"fmt"
"io/fs"
"path"
"slices"
"strings"
"syscall"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/versity/versitygw/s3response"
)
const (
pathSeparator = "/"
pathDot = "."
)
type WalkResults struct {
CommonPrefixes []types.CommonPrefix
Objects []s3response.Object
Truncated bool
NextMarker string
}
// walkState holds the state needed during directory traversal
type walkState struct {
ctx context.Context
fileSystem fs.FS
prefix string
delimiter string
marker string
max int32
getObj GetObjFunc
skipdirs []string
// Mutable state
cpmap cpMap
objects []s3response.Object
pastMax bool
newMarker string
truncated bool
walkErr error
}
type GetObjFunc func(path string, d fs.DirEntry) (s3response.Object, error)
var ErrSkipObj = errors.New("skip this object")
// map to store object common prefixes
type cpMap map[string]int
func (c cpMap) Add(key string) {
_, ok := c[key]
if !ok {
c[key] = len(c)
}
}
// Len returns the length of the map
func (c cpMap) Len() int {
return len(c)
}
// CpArray converts the map into a sorted []types.CommonPrefixes array
func (c cpMap) CpArray() []types.CommonPrefix {
commonPrefixes := make([]types.CommonPrefix, c.Len())
for cp, i := range c {
pfx := cp
commonPrefixes[i] = types.CommonPrefix{
Prefix: &pfx,
}
}
return commonPrefixes
}
// Walk walks the supplied fs.FS and returns results compatible with list
// objects responses
func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker string, max int32, getObj GetObjFunc, skipdirs []string) (WalkResults, error) {
// if max is 0, it should return empty non-truncated result
if max == 0 {
return WalkResults{
Truncated: false,
}, nil
}
state := &walkState{
ctx: ctx,
fileSystem: fileSystem,
prefix: prefix,
delimiter: delimiter,
marker: marker,
max: max,
getObj: getObj,
skipdirs: skipdirs,
cpmap: cpMap{},
}
qwErr := quickWalk(state)
if qwErr != nil {
return WalkResults{}, qwErr
}
return WalkResults{
CommonPrefixes: state.cpmap.CpArray(),
Objects: state.objects,
Truncated: state.truncated,
NextMarker: state.newMarker,
}, nil
}
type WalkVersioningResults struct {
CommonPrefixes []types.CommonPrefix
ObjectVersions []s3response.ObjectVersion
DelMarkers []types.DeleteMarkerEntry
Truncated bool
NextMarker string
NextVersionIdMarker string
}
type ObjVersionFuncResult struct {
ObjectVersions []s3response.ObjectVersion
DelMarkers []types.DeleteMarkerEntry
NextVersionIdMarker string
Truncated bool
}
type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*ObjVersionFuncResult, error)
// WalkVersions walks the supplied fs.FS and returns results compatible with
// ListObjectVersions action response
func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getObj GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) {
cpmap := cpMap{}
var objects []s3response.ObjectVersion
var delMarkers []types.DeleteMarkerEntry
var pastMarker bool
if keyMarker == "" {
pastMarker = true
}
var nextMarker string
var nextVersionIdMarker string
var truncated bool
pastVersionIdMarker := versionIdMarker == ""
err := walkDirRoot(fileSystem, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
// Ignore the root directory
if path == "." {
return nil
}
if slices.Contains(skipdirs, d.Name()) {
return fs.SkipDir
}
if !pastMarker {
if path == keyMarker {
pastMarker = true
}
if path < keyMarker {
return nil
}
}
if d.IsDir() {
// If prefix is defined and the directory does not match prefix,
// do not descend into the directory because nothing will
// match this prefix. Make sure to append the / at the end of
// directories since this is implied as a directory path name.
// If path is a prefix of prefix, then path could still be
// building to match. So only skip if path isn't a prefix of prefix
// and prefix isn't a prefix of path.
if prefix != "" &&
!strings.HasPrefix(path+"/", prefix) &&
!strings.HasPrefix(prefix, path+"/") {
return fs.SkipDir
}
// Don't recurse into subdirectories when listing with delimiter.
if delimiter == "/" &&
prefix != path+"/" &&
strings.HasPrefix(path+"/", prefix) {
cpmap.Add(path + "/")
return fs.SkipDir
}
// Skip ancestor directories of the specified prefix; only process
// the directory that exactly matches the prefix.
// At this point we know strings.HasPrefix(prefix, path+"/") holds
// (i.e. path is an ancestor of the prefix directory). Skip it
// unless it is the exact prefix directory.
// Note: WalkVersions always walks from "." (unlike Walk, which
// narrows the root) because versioning marker semantics require
// visiting all entries in order, so this guard is needed instead.
if prefix != "" && strings.HasPrefix(prefix, path+"/") && path+"/" != prefix {
return nil
}
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("directory to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// If object doesn't have prefix, don't include in results.
if prefix != "" && !strings.HasPrefix(path, prefix) {
return nil
}
if delimiter == "" {
// If no delimiter specified, then all files with matching
// prefix are included in results
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// Since delimiter is specified, we only want results that
// do not contain the delimiter beyond the prefix. If the
// delimiter exists past the prefix, then the substring
// between the prefix and delimiter is part of common prefixes.
//
// For example:
// prefix = A/
// delimiter = /
// and objects:
// A/file
// A/B/file
// B/C
// would return:
// objects: A/file
// common prefix: A/B/
//
// Note: No objects are included past the common prefix since
// these are all rolled up into the common prefix.
// Note: The delimiter can be anything, so we have to operate on
// the full path without any assumptions on posix directory hierarchy
// here. Usually the delimiter will be "/", but thats not required.
suffix := strings.TrimPrefix(path, prefix)
before, _, found := strings.Cut(suffix, delimiter)
if !found {
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-cpmap.Len(), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// Common prefixes are a set, so should not have duplicates.
// These are abstractly a "directory", so need to include the
// delimiter at the end.
cpmap.Add(prefix + before + delimiter)
if (len(objects) + cpmap.Len()) == int(max) {
nextMarker = path
truncated = true
return fs.SkipAll
}
return nil
})
if err != nil {
return WalkVersioningResults{}, err
}
return WalkVersioningResults{
CommonPrefixes: cpmap.CpArray(),
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: truncated,
NextMarker: nextMarker,
NextVersionIdMarker: nextVersionIdMarker,
}, nil
}
func readDir(path string, walkstate *walkState) {
err := walkstate.ctx.Err()
if err != nil {
walkstate.walkErr = err
return
}
entries, err := fs.ReadDir(walkstate.fileSystem, path)
if err != nil {
// Suppress not-found / not-a-dir errors: they indicate either a
// user-supplied prefix that doesn't exist on disk, or a directory
// that was removed concurrently during the walk. In both cases,
// treat the subtree as empty rather than surfacing an error.
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
return
}
walkstate.walkErr = fmt.Errorf("readdir %q: %w", path, err)
return
}
readDirEntries(path, entries, walkstate)
}
func readDirEntries(path string, entries []fs.DirEntry, walkstate *walkState) {
entriesIndex := 0
maxEntries := len(entries)
for entriesIndex < maxEntries {
err := walkstate.ctx.Err()
if err != nil {
walkstate.walkErr = err
return
}
if shouldSkip(entries[entriesIndex].Name(), walkstate) {
entriesIndex++
continue
}
if walkstate.pastMax {
walkstate.truncated = true
return
}
if entries[entriesIndex].IsDir() {
// processDir returns the index of the next unprocessed entry
// (the first sibling that lexically follows the directory);
// the directory itself is fully consumed inside processDir,
// so we must NOT increment entriesIndex after this call.
entriesIndex = processDir(
entries,
entriesIndex,
maxEntries,
path,
walkstate,
)
} else {
// this is a leaf entry so check if it needs to go
// in common prefixes, if not then add to results list
currentObjectName := makeFullObjectName(path, entries[entriesIndex].Name())
if currentObjectName > walkstate.marker {
if !checkAndMaybeAddCommonPrefix(currentObjectName, walkstate) {
addContentEntry(currentObjectName, entries[entriesIndex], walkstate)
}
}
entriesIndex++
}
}
}
func processDir(
entries []fs.DirEntry,
currentEntry int,
maxEntries int,
path string,
walkstate *walkState,
) int {
currentDirectoryName := entries[currentEntry].Name() + "/"
// check for later entries that should lexically sort before this directory
entriesIndex := currentEntry + 1
for entriesIndex < maxEntries {
err := walkstate.ctx.Err()
if err != nil {
walkstate.walkErr = err
return entriesIndex
}
if walkstate.pastMax {
walkstate.truncated = true
return entriesIndex
}
if entries[entriesIndex].Name() < currentDirectoryName {
// this entry precedes
// if it is a directory, we
// need to call recursively to
// check for further preceding entries
if shouldSkip(entries[entriesIndex].Name(), walkstate) {
entriesIndex++
continue
}
if entries[entriesIndex].IsDir() {
entriesIndex = processDir(
entries,
entriesIndex,
maxEntries,
path,
walkstate,
)
} else {
// this is a leaf entry so check if it needs to go
// in common prefixes, if not then add to results list
currentObjectName := makeFullObjectName(path, entries[entriesIndex].Name())
if currentObjectName > walkstate.marker {
if !checkAndMaybeAddCommonPrefix(currentObjectName, walkstate) {
addContentEntry(currentObjectName, entries[entriesIndex], walkstate)
}
}
entriesIndex++
}
} else {
// this entry does not precede, so no further ones will
// we can resume the dir that was current when we were called
break
}
}
// now read and process the dir we were originally called for
if walkstate.pastMax {
walkstate.truncated = true
return entriesIndex
}
// add directory to common prefixes if required
fullDirectoryName := makeFullObjectName(path, entries[currentEntry].Name())
fullObjectName := fullDirectoryName + "/"
if fullObjectName > walkstate.marker {
if checkAndMaybeAddCommonPrefix(fullObjectName, walkstate) {
// we can cut short here UNLESS the current directory
// forms part of the prefix - in which case we must keep
// searching it in case the prefix is below
if !strings.HasPrefix(walkstate.prefix, fullObjectName) || walkstate.pastMax {
return entriesIndex
}
} else {
dirobj, err := walkstate.getObj(fullObjectName, entries[currentEntry])
if err == ErrSkipObj {
// Directory exists in the filesystem but is not an object.
} else if err != nil {
walkstate.walkErr = fmt.Errorf("directory to object %q: %w", fullObjectName, err)
return entriesIndex
} else {
walkstate.addObject(dirobj, fullObjectName)
}
}
} else {
// if this directory is before startAfter,
// we can assume all its children are also before startAfter
// UNLESS this directory and startAfter share a
// common prefix
if !(strings.HasPrefix(fullObjectName, walkstate.marker) ||
strings.HasPrefix(walkstate.marker, fullObjectName)) {
return entriesIndex
}
}
if walkstate.pastMax {
walkstate.truncated = true
return entriesIndex
}
readDir(fullDirectoryName, walkstate)
return entriesIndex
}
func makeFullObjectName(path string, entry string) string {
if path == "." {
return entry
}
return path + "/" + entry
}
// checkAndMaybeAddCommonPrefix returns true if the object
// has been added as a common prefix or discarded because
// it doesn't match the prefix at all
//
// otherwise returns false, and the caller should add it to
// the full results set
func checkAndMaybeAddCommonPrefix(objectName string, walkstate *walkState,
) bool {
// check prefix
if walkstate.prefix != "" && !strings.HasPrefix(objectName, walkstate.prefix) {
return true // doesn't match prefix - discard
}
// check delimiter
if walkstate.delimiter != "" {
objectNameAfterPrefix := strings.TrimPrefix(objectName, walkstate.prefix) // remove prefix from name
objectNameBeforeDelimiter, _, delimiterFound := strings.Cut(objectNameAfterPrefix, walkstate.delimiter)
if delimiterFound {
// this object contributes to the common prefix list,
// but not the full results
commonPrefix := walkstate.prefix + objectNameBeforeDelimiter + walkstate.delimiter
if commonPrefix > walkstate.marker {
walkstate.addCommonPrefix(commonPrefix)
}
return true
}
}
return false
}
func shouldSkip(name string, walkstate *walkState) bool {
return slices.Contains(walkstate.skipdirs, name)
}
func addContentEntry(objectName string, dirEntry fs.DirEntry, walkstate *walkState) bool {
obj, err := walkstate.getObj(objectName, dirEntry)
if err == ErrSkipObj {
return false
}
if err != nil {
walkstate.walkErr = fmt.Errorf("file to object %q: %w", objectName, err)
return false
}
walkstate.addObject(obj, objectName)
return true
}
// addObject adds an object to the results and checks if limits are reached
func (w *walkState) addObject(obj s3response.Object, path string) bool {
w.objects = append(w.objects, obj)
if len(w.objects)+w.cpmap.Len() >= int(w.max) {
w.newMarker = path
w.pastMax = true
// Don't set truncated here - wait until we know there are more items
return true
}
return false
}
// addCommonPrefix adds a common prefix and checks if limits are reached
func (w *walkState) addCommonPrefix(cpref string) bool {
w.cpmap.Add(cpref)
if len(w.objects)+w.cpmap.Len() >= int(w.max) {
w.newMarker = cpref
w.pastMax = true
// Don't set truncated here - wait until we know there are more items
return true
}
return false
}
func quickWalk(walkstate *walkState) error {
rootDir := pathDot
var rootEntries []fs.DirEntry
useRootEntries := false
// see if we can jump straight into the prefix if it is a valid non-empty directory
if strings.Contains(walkstate.prefix, pathSeparator) {
if idx := strings.LastIndex(walkstate.prefix, pathSeparator); idx > 0 {
prefixDir := walkstate.prefix[:idx]
// If successful and non-empty, start from prefixDir and reuse entries
// to avoid a second ReadDir call.
entries, err := fs.ReadDir(walkstate.fileSystem, prefixDir)
if err == nil && len(entries) > 0 {
rootDir = prefixDir
rootEntries = entries
useRootEntries = true
}
}
}
if useRootEntries {
// keep fast-path semantics but still honor skipdirs for the prefixed root
if !shouldSkip(path.Base(rootDir), walkstate) {
// Mirror processDir: offer the root directory itself to getObj whenever
// its key sorts after the marker, using checkAndMaybeAddCommonPrefix to
// handle the delimiter case. This replaces the previous guards that
// incorrectly skipped "a/" when hasSubdirs==true or marker!="".
fullObjectName := rootDir + pathSeparator
if fullObjectName > walkstate.marker {
if !checkAndMaybeAddCommonPrefix(fullObjectName, walkstate) {
rootInfo, err := fs.Stat(walkstate.fileSystem, rootDir)
if err != nil {
walkstate.walkErr = fmt.Errorf("stat %q: %w", rootDir, err)
} else {
dirobj, err := walkstate.getObj(fullObjectName, fs.FileInfoToDirEntry(rootInfo))
if err == ErrSkipObj {
// Directory exists in the filesystem but is not an object.
} else if err != nil {
walkstate.walkErr = fmt.Errorf("directory to object %q: %w", fullObjectName, err)
} else {
walkstate.addObject(dirobj, fullObjectName)
}
}
}
}
if walkstate.walkErr == nil {
readDirEntries(rootDir, rootEntries, walkstate)
}
}
} else {
readDir(rootDir, walkstate)
}
if walkstate.walkErr != nil {
return walkstate.walkErr
}
if !walkstate.truncated {
walkstate.newMarker = ""
}
return nil
}