mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* feat(s3/versioning): grep-able heal logs + scan-anomaly diagnostics + audit cmd Three diagnostic additions on top of #9460, all aimed at making the next production incident faster to triage than the one we just spent hours on. 1. [versioning-heal] grep prefix on every heal-related log line, with a small fixed event vocabulary (produced / surfaced / healed / enqueue / drain / retry / gave_up / anomaly / clear_failed / heal_persist_failed / teardown_failed / queue_full). One grep gives operators a single event stream across the produce-to-drain lifecycle. 2. Escalate the "scanned N>0 entries but no valid latest" case in updateLatestVersionAfterDeletion from V(1) Infof to a Warning that names the orphan entries it saw. This is the listing-after-rm inconsistency signature that pinned down 259064a8's failure — it should not be invisible at default log levels. 3. New weed shell command `s3.versions.audit -prefix <path> [-v] [-heal]` that walks .versions/ directories under a prefix and reports the stranded population. With -heal it clears the latest-version pointer in place on stranded directories so subsequent reads return a clean NoSuchKey instead of replaying the 10-retry self-heal loop. * fix(s3/versioning): audit pagination, exclusive categories, ctx-aware retry Address PR review: 1. s3.versions.audit walked only the first 1024-entry page of each .versions/ directory, false-positiving "stranded" on large dirs. Loop until the page returns < 1024 entries, advancing startName. 2. clean and orphan-only categories double-counted when a directory had no pointer and at least one orphan: incremented both. Make them mutually exclusive so report totals sum to versionsDirs. 3. retryFilerOp's worst-case ~6.3s backoff was a bare time.Sleep, non-interruptible by ctx. A server shutdown / client disconnect would wait out the budget per in-flight delete. Thread ctx through deleteSpecificObjectVersion -> repointLatestBeforeDeletion / updateLatestVersionAfterDeletion -> retryFilerOp; backoff now uses a select{<-ctx.Done(), <-timer.C}. HTTP handlers pass r.Context(); gRPC lifecycle handlers pass the stream ctx. New test pins the behavior: cancelling ctx mid-backoff returns ctx.Err() in <500ms instead of blocking ~6.3s. * fix(s3/versioning): clearStale outcome + escape grep-able log fields Two coderabbit follow-ups: 1. Successful pointer clear should suppress `produced`. updateLatestVersionAfterDeletion's transient-rm fallback called clearStaleLatestVersionPointer best-effort, then unconditionally returned retryErr. The caller (deleteSpecificObjectVersion) saw the error and emitted `event=produced` + enqueued the reconciler, even though clearStaleLatestVersionPointer had just driven the pointer to consistency and the next reader would get NoSuchKey via the clean-miss path. Make clearStaleLatestVersionPointer return cleared bool; on success the caller returns nil so neither produced nor the reconciler enqueue fires. Concurrent-writer aborts, re-scan errors, and CAS mismatches still report false so genuinely stranded state keeps surfacing. 2. Escape user-controlled fields in heal log lines. versioningHealInfof / Warningf / Errorf interpolated raw bucket / key / filename / err text into a single-space-separated line. An S3 key (or error string from gRPC) containing whitespace, newlines, or `event=...` could split one event into multiple tokens and spoof fake fields downstream. Sanitize each arg in the helper: safe values pass through; anything with whitespace, quotes, control chars, or backslashes is replaced with its strconv.Quote form. No caller changes — the format strings remain unchanged. Tests pin both behaviors: sanitization table covers the field boundary cases; an end-to-end shape test confirms a key containing `event=spoof` stays inside a single quoted token.
2050 lines
82 KiB
Go
2050 lines
82 KiB
Go
package s3api
|
||
|
||
// This file contains the core S3 versioning operations.
|
||
// Version ID format handling is in s3api_version_id.go
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/hex"
|
||
"encoding/xml"
|
||
"errors"
|
||
"fmt"
|
||
"net/http"
|
||
"path"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||
s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||
"google.golang.org/grpc/codes"
|
||
"google.golang.org/grpc/status"
|
||
)
|
||
|
||
// ErrDeleteMarker is returned when the latest version is a delete marker (expected condition)
|
||
var ErrDeleteMarker = errors.New("latest version is a delete marker")
|
||
|
||
// clearCachedVersionMetadata clears only the version metadata fields (not ID/filename).
|
||
// Used by setCachedListMetadata to prevent stale values when updating.
|
||
func clearCachedVersionMetadata(extended map[string][]byte) {
|
||
delete(extended, s3_constants.ExtLatestVersionSizeKey)
|
||
delete(extended, s3_constants.ExtLatestVersionMtimeKey)
|
||
delete(extended, s3_constants.ExtLatestVersionETagKey)
|
||
delete(extended, s3_constants.ExtLatestVersionOwnerKey)
|
||
delete(extended, s3_constants.ExtLatestVersionIsDeleteMarker)
|
||
}
|
||
|
||
// markVersionNoncurrent stamps ExtNoncurrentSinceNsKey on the named entry
|
||
// inside .versions/. Called when a PUT or delete-marker demotes that entry
|
||
// from current to noncurrent so the s3 lifecycle engine can compute
|
||
// NoncurrentDays due time directly from the stamp instead of deriving it
|
||
// from the next-newer sibling's mtime. demotionNs is captured once per
|
||
// demotion event by the caller (typically time.Now().UnixNano()) and
|
||
// passed in so concurrent demotions on the same object don't race for
|
||
// a wall-clock read inside the helper.
|
||
//
|
||
// Idempotent on retries: if the key is already present, it is overwritten
|
||
// with the new value. Out-of-order overwrites are bounded by the caller's
|
||
// single-timestamp-per-event contract.
|
||
func (s3a *S3ApiServer) markVersionNoncurrent(bucketDir, versionsObjectPath, fileName string, demotionNs int64) {
|
||
if fileName == "" || demotionNs <= 0 {
|
||
return
|
||
}
|
||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||
entry, err := s3a.getEntry(versionsDir, fileName)
|
||
if err != nil {
|
||
glog.V(2).Infof("markVersionNoncurrent: skip %s/%s: %v", versionsDir, fileName, err)
|
||
return
|
||
}
|
||
if entry.Extended == nil {
|
||
entry.Extended = make(map[string][]byte)
|
||
}
|
||
entry.Extended[s3_constants.ExtNoncurrentSinceNsKey] = []byte(strconv.FormatInt(demotionNs, 10))
|
||
if err := s3a.updateEntry(versionsDir, entry); err != nil {
|
||
glog.V(2).Infof("markVersionNoncurrent: update %s/%s: %v", versionsDir, fileName, err)
|
||
}
|
||
}
|
||
|
||
// setCachedListMetadata caches list metadata in the .versions directory entry for single-scan efficiency
|
||
func setCachedListMetadata(versionsEntry, versionEntry *filer_pb.Entry) {
|
||
if versionEntry == nil || versionsEntry == nil {
|
||
return
|
||
}
|
||
if versionsEntry.Extended == nil {
|
||
versionsEntry.Extended = make(map[string][]byte)
|
||
}
|
||
|
||
// Clear old cached metadata to prevent stale values
|
||
// Note: We don't use clearCachedListMetadata here because it also clears
|
||
// ExtLatestVersionIdKey and ExtLatestVersionFileNameKey, which are set by the caller
|
||
clearCachedVersionMetadata(versionsEntry.Extended)
|
||
|
||
// Size and Mtime
|
||
if versionEntry.Attributes != nil {
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionSizeKey] = []byte(strconv.FormatUint(versionEntry.Attributes.FileSize, 10))
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionMtimeKey] = []byte(strconv.FormatInt(versionEntry.Attributes.Mtime, 10))
|
||
}
|
||
|
||
// ETag, Owner, DeleteMarker from Extended
|
||
if versionEntry.Extended != nil {
|
||
if etag, ok := versionEntry.Extended[s3_constants.ExtETagKey]; ok {
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionETagKey] = etag
|
||
}
|
||
if owner, ok := versionEntry.Extended[s3_constants.ExtAmzOwnerKey]; ok {
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionOwnerKey] = owner
|
||
}
|
||
if deleteMarker, ok := versionEntry.Extended[s3_constants.ExtDeleteMarkerKey]; ok {
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionIsDeleteMarker] = deleteMarker
|
||
} else {
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionIsDeleteMarker] = []byte("false")
|
||
}
|
||
}
|
||
}
|
||
|
||
// S3ListObjectVersionsResult - Custom struct for S3 list-object-versions response.
|
||
// This avoids conflicts with the XSD generated ListVersionsResult struct.
|
||
//
|
||
// The Entries slice holds Version, DeleteMarker, and CommonPrefix items in their
|
||
// correct interleaved sort order (by key ascending, then newest version first).
|
||
// Each entry uses a per-element MarshalXML to output the correct XML element name.
|
||
// This ensures the XML output matches the S3 API contract where these elements
|
||
// are interleaved in sort order, not grouped by type.
|
||
type S3ListObjectVersionsResult struct {
|
||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
|
||
|
||
Name string `xml:"Name"`
|
||
Prefix string `xml:"Prefix,omitempty"`
|
||
KeyMarker string `xml:"KeyMarker,omitempty"`
|
||
VersionIdMarker string `xml:"VersionIdMarker,omitempty"`
|
||
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
|
||
NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"`
|
||
MaxKeys int `xml:"MaxKeys"`
|
||
Delimiter string `xml:"Delimiter,omitempty"`
|
||
IsTruncated bool `xml:"IsTruncated"`
|
||
|
||
// Entries holds all versions, delete markers, and common prefixes in their
|
||
// correct interleaved sort order. Each entry's MarshalXML outputs the correct
|
||
// XML element name (<Version>, <DeleteMarker>, or <CommonPrefixes>).
|
||
// MarshalXML on each entry overrides the element name to <Version>,
|
||
// <DeleteMarker>, or <CommonPrefixes> as appropriate.
|
||
Entries []VersionListEntry `xml:",omitempty"`
|
||
|
||
EncodingType string `xml:"EncodingType,omitempty"`
|
||
}
|
||
|
||
// VersionListEntry represents a single item in the ListObjectVersions response.
|
||
// It wraps either a VersionEntry, DeleteMarkerEntry, or PrefixEntry and outputs
|
||
// the correct XML element name via custom MarshalXML.
|
||
type VersionListEntry struct {
|
||
Version *VersionEntry
|
||
DeleteMarker *DeleteMarkerEntry
|
||
Prefix *PrefixEntry
|
||
}
|
||
|
||
// MarshalXML outputs the entry as <Version>, <DeleteMarker>, or <CommonPrefixes>
|
||
// depending on which field is populated. Exactly one field must be set.
|
||
func (e VersionListEntry) MarshalXML(enc *xml.Encoder, start xml.StartElement) error {
|
||
var (
|
||
value any
|
||
name string
|
||
count int
|
||
)
|
||
if e.DeleteMarker != nil {
|
||
value = e.DeleteMarker
|
||
name = "DeleteMarker"
|
||
count++
|
||
}
|
||
if e.Prefix != nil {
|
||
value = e.Prefix
|
||
name = "CommonPrefixes"
|
||
count++
|
||
}
|
||
if e.Version != nil {
|
||
value = e.Version
|
||
name = "Version"
|
||
count++
|
||
}
|
||
if count != 1 {
|
||
return fmt.Errorf("VersionListEntry must have exactly one of DeleteMarker, Prefix, or Version set (got %d)", count)
|
||
}
|
||
start.Name.Local = name
|
||
return enc.EncodeElement(value, start)
|
||
}
|
||
|
||
// ObjectVersion represents a version of an S3 object
|
||
// Note: We intentionally do not store the full filer_pb.Entry here to avoid
|
||
// retaining large Chunks arrays in memory during list operations.
|
||
type ObjectVersion struct {
|
||
VersionId string
|
||
IsLatest bool
|
||
IsDeleteMarker bool
|
||
LastModified time.Time
|
||
ETag string
|
||
Size int64
|
||
OwnerID string // Owner ID extracted from entry metadata
|
||
StorageClass string
|
||
}
|
||
|
||
// createDeleteMarker creates a delete marker for versioned delete operations
|
||
func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) {
|
||
// Clean up the object path first
|
||
cleanObject := strings.TrimPrefix(object, "/")
|
||
|
||
// Check if .versions directory exists to determine format
|
||
useInvertedFormat := s3a.getVersionIdFormat(bucket, cleanObject)
|
||
versionId := generateVersionId(useInvertedFormat)
|
||
|
||
glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s (inverted=%v)", versionId, bucket, object, useInvertedFormat)
|
||
|
||
// Create the version file name for the delete marker
|
||
versionFileName := s3a.getVersionFileName(versionId)
|
||
|
||
// Store delete marker in the .versions directory
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder
|
||
|
||
// Create the delete marker entry in the .versions directory
|
||
deleteMarkerMtime := time.Now().Unix()
|
||
deleteMarkerExtended := map[string][]byte{
|
||
s3_constants.ExtVersionIdKey: []byte(versionId),
|
||
s3_constants.ExtDeleteMarkerKey: []byte("true"),
|
||
}
|
||
|
||
err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
|
||
entry.IsDirectory = false
|
||
if entry.Attributes == nil {
|
||
entry.Attributes = &filer_pb.FuseAttributes{}
|
||
}
|
||
entry.Attributes.Mtime = deleteMarkerMtime
|
||
if entry.Extended == nil {
|
||
entry.Extended = make(map[string][]byte)
|
||
}
|
||
for k, v := range deleteMarkerExtended {
|
||
entry.Extended[k] = v
|
||
}
|
||
})
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to create delete marker in .versions directory: %w", err)
|
||
}
|
||
|
||
// Update the .versions directory metadata to indicate this delete marker is the latest version
|
||
// Pass deleteMarkerEntry to cache its metadata for single-scan list efficiency
|
||
deleteMarkerEntry := &filer_pb.Entry{
|
||
Name: versionFileName,
|
||
IsDirectory: false,
|
||
Attributes: &filer_pb.FuseAttributes{
|
||
Mtime: deleteMarkerMtime,
|
||
},
|
||
Extended: deleteMarkerExtended,
|
||
}
|
||
err = s3a.updateLatestVersionInDirectory(bucket, cleanObject, versionId, versionFileName, deleteMarkerEntry)
|
||
if err != nil {
|
||
glog.Errorf("createDeleteMarker: failed to update latest version in directory: %v", err)
|
||
return "", fmt.Errorf("failed to update latest version in directory: %w", err)
|
||
}
|
||
|
||
glog.V(2).Infof("createDeleteMarker: successfully created delete marker %s for %s/%s", versionId, bucket, object)
|
||
return versionId, nil
|
||
}
|
||
|
||
// versionListItem represents an item in the unified version/prefix list
|
||
type versionListItem struct {
|
||
key string
|
||
versionId string
|
||
isPrefix bool
|
||
versionData interface{} // *VersionEntry or *DeleteMarkerEntry
|
||
}
|
||
|
||
// listObjectVersions lists all versions of an object
|
||
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) {
|
||
// S3 API limits max-keys to 1000
|
||
if maxKeys > 1000 {
|
||
maxKeys = 1000
|
||
}
|
||
// Pre-allocate with capacity for maxKeys+1 to reduce reallocations
|
||
// The extra 1 is for truncation detection
|
||
allVersions := make([]interface{}, 0, maxKeys+1)
|
||
|
||
glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s', keyMarker '%s', versionIdMarker '%s'", bucket, prefix, keyMarker, versionIdMarker)
|
||
|
||
// Track objects that have been processed to avoid duplicates
|
||
processedObjects := make(map[string]bool)
|
||
|
||
// Track version IDs globally to prevent duplicates throughout the listing
|
||
seenVersionIds := make(map[string]bool)
|
||
|
||
// Map to track common prefixes (deduplicated)
|
||
commonPrefixes := make(map[string]bool)
|
||
|
||
// Recursively find all .versions directories in the bucket
|
||
// Pass keyMarker and versionIdMarker to enable efficient pagination (skip entries before marker)
|
||
bucketPath := s3a.bucketDir(bucket)
|
||
|
||
// Memory optimization: limit collection to maxKeys+1 versions.
|
||
// This works correctly for objects using the NEW inverted-timestamp format, where
|
||
// filesystem order (lexicographic) matches sorted order (newest-first).
|
||
// For OLD format objects (raw timestamps), filesystem order is oldest-first, so
|
||
// limiting collection may return older versions instead of newest. However:
|
||
// - New objects going forward use the new format
|
||
// - The alternative (collecting all) causes memory issues for buckets with many versions
|
||
// - Pagination continues correctly; users can page through to see all versions
|
||
maxCollect := maxKeys + 1 // +1 to detect truncation
|
||
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, delimiter, commonPrefixes, maxCollect)
|
||
if err != nil {
|
||
glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err)
|
||
return nil, err
|
||
}
|
||
|
||
clear(processedObjects)
|
||
clear(seenVersionIds)
|
||
|
||
// Combine versions and prefixes into a single sorted list
|
||
combinedList := s3a.buildSortedCombinedList(allVersions, commonPrefixes)
|
||
glog.V(1).Infof("listObjectVersions: collected %d combined items (versions+prefixes)", len(combinedList))
|
||
|
||
// Apply MaxKeys truncation and determine pagination markers
|
||
truncatedList, nextKeyMarker, nextVersionIdMarker, isTruncated := s3a.truncateAndSetMarkers(combinedList, maxKeys)
|
||
glog.V(1).Infof("listObjectVersions: after truncation - %d items (truncated: %v)", len(truncatedList), isTruncated)
|
||
|
||
// Build the final response by splitting items back into their respective fields
|
||
result := s3a.splitIntoResult(truncatedList, bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys, isTruncated, nextKeyMarker, nextVersionIdMarker)
|
||
glog.V(1).Infof("listObjectVersions: final result - %d entries", len(result.Entries))
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// buildSortedCombinedList merges versions and common prefixes into a single list
|
||
// sorted lexicographically by key, with versions preceding prefixes for the same key.
|
||
func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commonPrefixes map[string]bool) []versionListItem {
|
||
combinedList := make([]versionListItem, 0, len(allVersions)+len(commonPrefixes))
|
||
|
||
// Add versions
|
||
for _, version := range allVersions {
|
||
var key, versionId string
|
||
switch v := version.(type) {
|
||
case *VersionEntry:
|
||
key = v.Key
|
||
versionId = v.VersionId
|
||
case *DeleteMarkerEntry:
|
||
key = v.Key
|
||
versionId = v.VersionId
|
||
}
|
||
combinedList = append(combinedList, versionListItem{
|
||
key: key,
|
||
versionId: versionId,
|
||
isPrefix: false,
|
||
versionData: version,
|
||
})
|
||
}
|
||
|
||
// Add common prefixes
|
||
for prefix := range commonPrefixes {
|
||
combinedList = append(combinedList, versionListItem{
|
||
key: prefix,
|
||
isPrefix: true,
|
||
})
|
||
}
|
||
|
||
// Single sort for the entire combined list
|
||
sort.Slice(combinedList, func(i, j int) bool {
|
||
if combinedList[i].key != combinedList[j].key {
|
||
return combinedList[i].key < combinedList[j].key
|
||
}
|
||
// For same key, versions come before prefixes
|
||
if combinedList[i].isPrefix != combinedList[j].isPrefix {
|
||
return !combinedList[i].isPrefix
|
||
}
|
||
// For same key with both being versions, sort by version ID (newest first)
|
||
return compareVersionIds(combinedList[i].versionId, combinedList[j].versionId) < 0
|
||
})
|
||
|
||
return combinedList
|
||
}
|
||
|
||
// truncateAndSetMarkers applies MaxKeys limit and determines pagination markers
|
||
func (s3a *S3ApiServer) truncateAndSetMarkers(combinedList []versionListItem, maxKeys int) (truncated []versionListItem, nextKeyMarker, nextVersionIdMarker string, isTruncated bool) {
|
||
isTruncated = len(combinedList) > maxKeys
|
||
if isTruncated && maxKeys > 0 {
|
||
// Set markers from the last item we'll return
|
||
lastItem := combinedList[maxKeys-1]
|
||
nextKeyMarker = lastItem.key
|
||
if !lastItem.isPrefix {
|
||
nextVersionIdMarker = lastItem.versionId
|
||
}
|
||
// Truncate the list
|
||
combinedList = combinedList[:maxKeys]
|
||
}
|
||
return combinedList, nextKeyMarker, nextVersionIdMarker, isTruncated
|
||
}
|
||
|
||
// splitIntoResult builds the final S3ListObjectVersionsResult from the combined list.
|
||
// It populates a single Entries slice that preserves the interleaved sort order.
|
||
func (s3a *S3ApiServer) splitIntoResult(combinedList []versionListItem, bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int, isTruncated bool, nextKeyMarker, nextVersionIdMarker string) *S3ListObjectVersionsResult {
|
||
result := &S3ListObjectVersionsResult{
|
||
Name: bucket,
|
||
Prefix: prefix,
|
||
KeyMarker: keyMarker,
|
||
VersionIdMarker: versionIdMarker,
|
||
MaxKeys: maxKeys,
|
||
Delimiter: delimiter,
|
||
IsTruncated: isTruncated,
|
||
NextKeyMarker: nextKeyMarker,
|
||
NextVersionIdMarker: nextVersionIdMarker,
|
||
Entries: make([]VersionListEntry, 0, len(combinedList)),
|
||
}
|
||
|
||
for _, item := range combinedList {
|
||
if item.isPrefix {
|
||
result.Entries = append(result.Entries, VersionListEntry{
|
||
Prefix: &PrefixEntry{Prefix: item.key},
|
||
})
|
||
} else {
|
||
switch v := item.versionData.(type) {
|
||
case *VersionEntry:
|
||
result.Entries = append(result.Entries, VersionListEntry{Version: v})
|
||
case *DeleteMarkerEntry:
|
||
result.Entries = append(result.Entries, VersionListEntry{DeleteMarker: v})
|
||
}
|
||
}
|
||
}
|
||
return result
|
||
}
|
||
|
||
// versionCollector holds state for collecting object versions during recursive traversal
|
||
type versionCollector struct {
|
||
s3a *S3ApiServer
|
||
bucket string
|
||
prefix string
|
||
keyMarker string
|
||
versionIdMarker string
|
||
maxCollect int
|
||
allVersions *[]interface{}
|
||
processedObjects map[string]bool
|
||
seenVersionIds map[string]bool
|
||
delimiter string
|
||
commonPrefixes map[string]bool
|
||
}
|
||
|
||
// isFull returns true if we've collected enough versions
|
||
func (vc *versionCollector) isFull() bool {
|
||
if vc.maxCollect <= 0 {
|
||
return false
|
||
}
|
||
currentCount := len(*vc.allVersions)
|
||
if vc.commonPrefixes != nil {
|
||
currentCount += len(vc.commonPrefixes)
|
||
}
|
||
return currentCount >= vc.maxCollect
|
||
}
|
||
|
||
// matchesPrefixFilter checks if an entry path matches the prefix filter
|
||
func (vc *versionCollector) matchesPrefixFilter(entryPath string, isDirectory bool) bool {
|
||
if vc.prefix == "" {
|
||
return true
|
||
}
|
||
|
||
// Entry matches if its path starts with the prefix
|
||
isMatch := strings.HasPrefix(entryPath, vc.prefix)
|
||
if !isMatch && isDirectory {
|
||
// Directory might match with trailing slash
|
||
isMatch = strings.HasPrefix(entryPath+"/", vc.prefix)
|
||
}
|
||
|
||
// For directories, also check if we need to descend (prefix is deeper)
|
||
canDescend := isDirectory && strings.HasPrefix(vc.prefix, entryPath)
|
||
|
||
return isMatch || canDescend
|
||
}
|
||
|
||
// computeStartFrom extracts the first path component from keyMarker that applies
|
||
// to the given directory level (relativePath), allowing the directory listing to
|
||
// skip directly to the marker position instead of scanning from the beginning.
|
||
// Returns ("", false) when no optimization is possible.
|
||
func (vc *versionCollector) computeStartFrom(relativePath string) (startFrom string, inclusive bool) {
|
||
if vc.keyMarker == "" {
|
||
return "", false
|
||
}
|
||
|
||
var remainder string
|
||
if relativePath == "" {
|
||
remainder = vc.keyMarker
|
||
} else if strings.HasPrefix(vc.keyMarker, relativePath+"/") {
|
||
remainder = vc.keyMarker[len(relativePath)+1:]
|
||
} else {
|
||
return "", false
|
||
}
|
||
|
||
if remainder == "" {
|
||
return "", false
|
||
}
|
||
|
||
if idx := strings.Index(remainder, "/"); idx >= 0 {
|
||
return remainder[:idx], true
|
||
}
|
||
return remainder, true
|
||
}
|
||
|
||
// shouldSkipObjectForMarker returns true if the object should be skipped based on keyMarker
|
||
func (vc *versionCollector) shouldSkipObjectForMarker(objectKey string) bool {
|
||
if vc.keyMarker == "" {
|
||
return false
|
||
}
|
||
return objectKey < vc.keyMarker
|
||
}
|
||
|
||
// shouldSkipVersionForMarker returns true if a version should be skipped based on markers
|
||
// For the keyMarker object, skip versions that are newer than or equal to versionIdMarker
|
||
// (these were already returned in previous pages).
|
||
// Handles both old (raw timestamp) and new (inverted timestamp) version ID formats.
|
||
func (vc *versionCollector) shouldSkipVersionForMarker(objectKey, versionId string) bool {
|
||
if vc.keyMarker == "" || objectKey != vc.keyMarker {
|
||
return false
|
||
}
|
||
// Object matches keyMarker - apply version filtering
|
||
if vc.versionIdMarker == "" {
|
||
// When a keyMarker is provided without a versionIdMarker, S3 pagination
|
||
// starts after the keyMarker object. Returning true here ensures that
|
||
// all versions of the keyMarker object are skipped.
|
||
return true
|
||
}
|
||
// Skip versions that are newer than or equal to versionIdMarker
|
||
// compareVersionIds returns negative if versionId is newer than marker
|
||
// We skip if versionId is newer (negative) or equal (zero) to the marker
|
||
cmp := compareVersionIds(versionId, vc.versionIdMarker)
|
||
return cmp <= 0
|
||
}
|
||
|
||
// addVersion adds a version or delete marker to results
|
||
func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string) {
|
||
if version.IsDeleteMarker {
|
||
deleteMarker := &DeleteMarkerEntry{
|
||
Key: objectKey,
|
||
VersionId: version.VersionId,
|
||
IsLatest: version.IsLatest,
|
||
LastModified: version.LastModified,
|
||
Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey),
|
||
}
|
||
*vc.allVersions = append(*vc.allVersions, deleteMarker)
|
||
} else {
|
||
versionEntry := &VersionEntry{
|
||
Key: objectKey,
|
||
VersionId: version.VersionId,
|
||
IsLatest: version.IsLatest,
|
||
LastModified: version.LastModified,
|
||
ETag: version.ETag,
|
||
Size: version.Size,
|
||
Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey),
|
||
StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entryExtended(version))),
|
||
}
|
||
*vc.allVersions = append(*vc.allVersions, versionEntry)
|
||
}
|
||
}
|
||
|
||
// processVersionsDirectory handles a .versions directory entry
|
||
func (vc *versionCollector) processVersionsDirectory(entryPath string) error {
|
||
objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder)
|
||
normalizedObjectKey := s3_constants.NormalizeObjectKey(objectKey)
|
||
|
||
// Mark as processed
|
||
vc.processedObjects[objectKey] = true
|
||
vc.processedObjects[normalizedObjectKey] = true
|
||
|
||
// Skip objects before keyMarker
|
||
if vc.shouldSkipObjectForMarker(normalizedObjectKey) {
|
||
glog.V(4).Infof("processVersionsDirectory: skipping object %s (before keyMarker %s)", normalizedObjectKey, vc.keyMarker)
|
||
return nil
|
||
}
|
||
|
||
glog.V(2).Infof("processVersionsDirectory: found object %s", normalizedObjectKey)
|
||
|
||
versions, err := vc.s3a.getObjectVersionList(vc.bucket, normalizedObjectKey)
|
||
if err != nil {
|
||
glog.Warningf("processVersionsDirectory: failed to get versions for %s: %v", normalizedObjectKey, err)
|
||
return nil // Continue with other entries
|
||
}
|
||
|
||
for _, version := range versions {
|
||
if vc.isFull() {
|
||
return nil
|
||
}
|
||
|
||
versionKey := normalizedObjectKey + ":" + version.VersionId
|
||
if vc.seenVersionIds[versionKey] {
|
||
continue
|
||
}
|
||
|
||
// Skip versions that were already returned in previous pages
|
||
if vc.shouldSkipVersionForMarker(normalizedObjectKey, version.VersionId) {
|
||
continue
|
||
}
|
||
|
||
vc.seenVersionIds[versionKey] = true
|
||
vc.addVersion(version, normalizedObjectKey)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// processExplicitDirectory handles an explicit S3 directory object
|
||
func (vc *versionCollector) processExplicitDirectory(entryPath string, entry *filer_pb.Entry) {
|
||
directoryKey := entryPath
|
||
if !strings.HasSuffix(directoryKey, "/") {
|
||
directoryKey += "/"
|
||
}
|
||
|
||
// Skip directories at or before keyMarker
|
||
if vc.keyMarker != "" && directoryKey <= vc.keyMarker {
|
||
return
|
||
}
|
||
|
||
versionEntry := &VersionEntry{
|
||
Key: directoryKey,
|
||
VersionId: "null",
|
||
IsLatest: true,
|
||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||
ETag: "\"d41d8cd98f00b204e9800998ecf8427e\"", // Empty content ETag
|
||
Size: 0,
|
||
Owner: vc.s3a.getObjectOwnerFromEntry(entry),
|
||
StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entry.Extended)),
|
||
}
|
||
*vc.allVersions = append(*vc.allVersions, versionEntry)
|
||
}
|
||
|
||
// processRegularFile handles a regular file entry (pre-versioning or suspended-versioning object)
|
||
func (vc *versionCollector) processRegularFile(currentPath, entryPath string, entry *filer_pb.Entry) {
|
||
objectKey := entryPath
|
||
normalizedObjectKey := s3_constants.NormalizeObjectKey(objectKey)
|
||
|
||
// Skip files before keyMarker
|
||
if vc.shouldSkipObjectForMarker(normalizedObjectKey) {
|
||
return
|
||
}
|
||
|
||
// For keyMarker match, skip if this null version was already returned
|
||
if vc.shouldSkipVersionForMarker(normalizedObjectKey, "null") {
|
||
return
|
||
}
|
||
|
||
// Skip if already processed via .versions directory
|
||
if vc.processedObjects[objectKey] || vc.processedObjects[normalizedObjectKey] {
|
||
return
|
||
}
|
||
|
||
// Check if this file has version metadata
|
||
hasVersionMeta := entry.Extended != nil && entry.Extended[s3_constants.ExtVersionIdKey] != nil
|
||
|
||
// Check if a .versions directory exists for this object
|
||
versionsEntryName := entry.Name + s3_constants.VersionsFolder
|
||
_, versionsErr := vc.s3a.getEntry(currentPath, versionsEntryName)
|
||
if versionsErr == nil && !hasVersionMeta {
|
||
// .versions exists but file has no version metadata - check for null version in .versions
|
||
versions, err := vc.s3a.getObjectVersionList(vc.bucket, normalizedObjectKey)
|
||
if err == nil {
|
||
for _, v := range versions {
|
||
if v.VersionId == "null" {
|
||
// Null version exists in .versions, skip this file
|
||
vc.processedObjects[objectKey] = true
|
||
vc.processedObjects[normalizedObjectKey] = true
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Check for duplicate
|
||
versionKey := normalizedObjectKey + ":null"
|
||
if vc.seenVersionIds[versionKey] {
|
||
return
|
||
}
|
||
vc.seenVersionIds[versionKey] = true
|
||
|
||
versionEntry := &VersionEntry{
|
||
Key: normalizedObjectKey,
|
||
VersionId: "null",
|
||
IsLatest: true,
|
||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||
ETag: vc.s3a.calculateETagFromChunks(entry.Chunks),
|
||
Size: int64(entry.Attributes.FileSize),
|
||
Owner: vc.s3a.getObjectOwnerFromEntry(entry),
|
||
StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entry.Extended)),
|
||
}
|
||
*vc.allVersions = append(*vc.allVersions, versionEntry)
|
||
}
|
||
|
||
// findVersionsRecursively searches for .versions directories and regular files recursively
|
||
// with efficient pagination support. It skips objects before keyMarker and applies versionIdMarker filtering.
|
||
// maxCollect limits the number of versions to collect for memory efficiency (must be > 0)
|
||
// delimiter and commonPrefixes are used to group keys that share a common prefix
|
||
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker, delimiter string, commonPrefixes map[string]bool, maxCollect int) error {
|
||
vc := &versionCollector{
|
||
s3a: s3a,
|
||
bucket: bucket,
|
||
prefix: prefix,
|
||
keyMarker: keyMarker,
|
||
versionIdMarker: versionIdMarker,
|
||
maxCollect: maxCollect,
|
||
allVersions: allVersions,
|
||
processedObjects: processedObjects,
|
||
seenVersionIds: seenVersionIds,
|
||
delimiter: delimiter,
|
||
commonPrefixes: commonPrefixes,
|
||
}
|
||
|
||
return vc.collectVersions(currentPath, relativePath)
|
||
}
|
||
|
||
// collectVersions recursively collects versions from the given path
|
||
func (vc *versionCollector) collectVersions(currentPath, relativePath string) error {
|
||
startFrom := ""
|
||
inclusive := false
|
||
// On the first iteration, skip ahead to the marker position to avoid
|
||
// re-scanning all entries before the marker on every paginated request.
|
||
if markerStart, ok := vc.computeStartFrom(relativePath); ok && markerStart != "" {
|
||
startFrom = markerStart
|
||
inclusive = true
|
||
}
|
||
for {
|
||
if vc.isFull() {
|
||
return nil
|
||
}
|
||
|
||
entries, isLast, err := vc.s3a.list(currentPath, "", startFrom, inclusive, filer.PaginationSize)
|
||
// After the first batch, use exclusive mode for standard pagination
|
||
inclusive = false
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
for _, entry := range entries {
|
||
if vc.isFull() {
|
||
return nil
|
||
}
|
||
startFrom = entry.Name
|
||
entryPath := path.Join(relativePath, entry.Name)
|
||
|
||
if !vc.matchesPrefixFilter(entryPath, entry.IsDirectory) {
|
||
continue
|
||
}
|
||
|
||
// Handle special directories that should bypass delimiter logic
|
||
// This ensures .versions directories are processed as version containers
|
||
// rather than being rolled up into CommonPrefixes when a delimiter is used
|
||
if entry.IsDirectory {
|
||
// Skip .uploads directory
|
||
if strings.HasPrefix(entry.Name, s3_constants.MultipartUploadsFolder) {
|
||
continue
|
||
}
|
||
|
||
// Handle .versions directory
|
||
if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||
if err := vc.processVersionsDirectory(entryPath); err != nil {
|
||
return err
|
||
}
|
||
continue
|
||
}
|
||
}
|
||
|
||
// Group into common prefixes if delimiter is found after the prefix
|
||
if vc.delimiter != "" {
|
||
fullKey := entryPath
|
||
if entry.IsDirectory {
|
||
fullKey += "/"
|
||
}
|
||
if strings.HasPrefix(fullKey, vc.prefix) {
|
||
remainder := fullKey[len(vc.prefix):]
|
||
if idx := strings.Index(remainder, vc.delimiter); idx >= 0 {
|
||
commonPrefix := vc.prefix + remainder[:idx+len(vc.delimiter)]
|
||
|
||
// Add to CommonPrefixes set if it hasn't been returned yet
|
||
if !vc.commonPrefixes[commonPrefix] {
|
||
// Filter by keyMarker to ensure proper pagination behavior
|
||
if vc.keyMarker != "" && commonPrefix <= vc.keyMarker {
|
||
continue
|
||
}
|
||
if vc.isFull() {
|
||
return nil
|
||
}
|
||
vc.commonPrefixes[commonPrefix] = true
|
||
}
|
||
|
||
// Skip further processing (recursion or addition) for this entry
|
||
// because it has been rolled up into the CommonPrefix
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
|
||
if entry.IsDirectory {
|
||
if err := vc.processDirectory(currentPath, entryPath, entry); err != nil {
|
||
return err
|
||
}
|
||
} else {
|
||
vc.processRegularFile(currentPath, entryPath, entry)
|
||
}
|
||
}
|
||
|
||
if isLast {
|
||
break
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// processDirectory handles directory entries
|
||
func (vc *versionCollector) processDirectory(currentPath, entryPath string, entry *filer_pb.Entry) error {
|
||
// Handle explicit S3 directory object
|
||
if entry.Attributes.Mime == s3_constants.FolderMimeType {
|
||
vc.processExplicitDirectory(entryPath, entry)
|
||
}
|
||
|
||
// Skip entire subdirectory if all keys within it are before the keyMarker.
|
||
// All object keys under this directory start with entryPath+"/". If the marker
|
||
// doesn't descend into this directory and entryPath+"/" sorts before the marker,
|
||
// then every key in this subtree was already returned in a previous page.
|
||
if vc.keyMarker != "" && !strings.HasPrefix(vc.keyMarker, entryPath+"/") && entryPath+"/" < vc.keyMarker {
|
||
return nil
|
||
}
|
||
|
||
// Recursively search subdirectory
|
||
fullPath := path.Join(currentPath, entry.Name)
|
||
if err := vc.collectVersions(fullPath, entryPath); err != nil {
|
||
glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// getObjectVersionList returns all versions of a specific object
|
||
// Uses pagination to handle objects with more than 1000 versions
|
||
func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
|
||
var versions []*ObjectVersion
|
||
|
||
glog.V(2).Infof("getObjectVersionList: looking for versions of %s/%s in .versions directory", bucket, object)
|
||
|
||
// All versions are now stored in the .versions directory only
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsObjectPath := object + s3_constants.VersionsFolder
|
||
glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath)
|
||
|
||
// Get the .versions directory entry to read latest version metadata
|
||
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
|
||
if err != nil {
|
||
// No versions directory exists, return empty list
|
||
glog.V(2).Infof("getObjectVersionList: no versions directory found: %v", err)
|
||
return versions, nil
|
||
}
|
||
|
||
// Get the latest version info from directory metadata
|
||
var latestVersionId string
|
||
if versionsEntry.Extended != nil {
|
||
if latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatestVersionId {
|
||
latestVersionId = string(latestVersionIdBytes)
|
||
glog.V(2).Infof("getObjectVersionList: latest version ID from directory metadata: %s", latestVersionId)
|
||
}
|
||
}
|
||
|
||
// Use a map to detect and prevent duplicate version IDs
|
||
seenVersionIds := make(map[string]bool)
|
||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||
|
||
// Paginate through all version files in the .versions directory
|
||
startFrom := ""
|
||
const pageSize = 1000
|
||
totalEntries := 0
|
||
|
||
for {
|
||
entries, isLast, err := s3a.list(versionsDir, "", startFrom, false, pageSize)
|
||
if err != nil {
|
||
glog.Warningf("getObjectVersionList: failed to list version files in %s: %v", versionsDir, err)
|
||
return nil, err
|
||
}
|
||
|
||
totalEntries += len(entries)
|
||
|
||
for i, entry := range entries {
|
||
// Track last entry for pagination
|
||
startFrom = entry.Name
|
||
|
||
if entry.Extended == nil {
|
||
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
|
||
continue
|
||
}
|
||
|
||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||
if !hasVersionId {
|
||
glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i)
|
||
continue
|
||
}
|
||
|
||
versionId := string(versionIdBytes)
|
||
|
||
// Check for duplicate version IDs and skip if already seen
|
||
if seenVersionIds[versionId] {
|
||
glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
|
||
continue
|
||
}
|
||
seenVersionIds[versionId] = true
|
||
|
||
// Check if this version is the latest by comparing with directory metadata
|
||
isLatest := (versionId == latestVersionId)
|
||
|
||
isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
|
||
isDeleteMarker := string(isDeleteMarkerBytes) == "true"
|
||
|
||
glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker)
|
||
|
||
// Extract owner ID from entry metadata to avoid retaining full Entry with Chunks
|
||
var ownerID string
|
||
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||
ownerID = string(ownerBytes)
|
||
}
|
||
|
||
version := &ObjectVersion{
|
||
VersionId: versionId,
|
||
IsLatest: isLatest,
|
||
IsDeleteMarker: isDeleteMarker,
|
||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||
OwnerID: ownerID,
|
||
StorageClass: s3a.getStorageClassFromExtended(entry.Extended),
|
||
}
|
||
|
||
if !isDeleteMarker {
|
||
// Try to get ETag from Extended attributes first
|
||
if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
|
||
version.ETag = string(etagBytes)
|
||
if !strings.HasPrefix(version.ETag, "\"") {
|
||
version.ETag = "\"" + version.ETag + "\""
|
||
}
|
||
} else {
|
||
// Fallback: calculate ETag from chunks
|
||
version.ETag = s3a.calculateETagFromChunks(entry.Chunks)
|
||
}
|
||
version.Size = int64(entry.Attributes.FileSize)
|
||
}
|
||
|
||
versions = append(versions, version)
|
||
}
|
||
|
||
// Stop if we've reached the last page
|
||
if isLast || len(entries) < pageSize {
|
||
break
|
||
}
|
||
}
|
||
|
||
// Clear map to help GC
|
||
clear(seenVersionIds)
|
||
|
||
// Don't sort here - let the main listObjectVersions function handle sorting consistently
|
||
|
||
glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, totalEntries)
|
||
for i, version := range versions {
|
||
glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
|
||
}
|
||
|
||
return versions, nil
|
||
}
|
||
|
||
// calculateETagFromChunks calculates ETag from file chunks following S3 multipart rules
|
||
// This is a wrapper around filer.ETagChunks that adds quotes for S3 compatibility
|
||
func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) string {
|
||
if len(chunks) == 0 {
|
||
return "\"\""
|
||
}
|
||
|
||
// Use the existing filer ETag calculation and add quotes for S3 compatibility
|
||
etag := filer.ETagChunks(chunks)
|
||
if etag == "" {
|
||
return "\"\""
|
||
}
|
||
return fmt.Sprintf("\"%s\"", etag)
|
||
}
|
||
|
||
// getSpecificObjectVersion retrieves a specific version of an object
|
||
func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) {
|
||
// Normalize object path to ensure consistency with toFilerPath behavior
|
||
normalizedObject := s3_constants.NormalizeObjectKey(object)
|
||
|
||
if versionId == "" {
|
||
// Get current version
|
||
return s3a.getEntry(s3a.bucketDir(bucket), normalizedObject)
|
||
}
|
||
|
||
if versionId == "null" {
|
||
// "null" version ID refers to pre-versioning objects stored as regular files
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
entry, err := s3a.getEntry(bucketDir, normalizedObject)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("null version object %s not found: %v", normalizedObject, err)
|
||
}
|
||
return entry, nil
|
||
}
|
||
|
||
// Get specific version from .versions directory
|
||
versionsDir := s3a.getVersionedObjectDir(bucket, normalizedObject)
|
||
versionFile := s3a.getVersionFileName(versionId)
|
||
|
||
entry, err := s3a.getEntry(versionsDir, versionFile)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("version %s not found: %v", versionId, err)
|
||
}
|
||
|
||
return entry, nil
|
||
}
|
||
|
||
// deleteSpecificObjectVersion deletes a specific version of an object.
|
||
// metadataOnly=true skips per-chunk DeleteFile RPCs at the filer; only
|
||
// pass true when the live entry's Attributes.TtlSec > 0 so the volume
|
||
// reclaims chunks on its own.
|
||
func (s3a *S3ApiServer) deleteSpecificObjectVersion(ctx context.Context, bucket, object, versionId string, metadataOnly bool) error {
|
||
// Normalize object path to ensure consistency with toFilerPath behavior
|
||
normalizedObject := s3_constants.NormalizeObjectKey(object)
|
||
|
||
if versionId == "" {
|
||
return fmt.Errorf("version ID is required for version-specific deletion")
|
||
}
|
||
|
||
if versionId == "null" {
|
||
// Delete "null" version (pre-versioning object stored as regular file)
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
|
||
// Check if the object exists
|
||
_, err := s3a.getEntry(bucketDir, normalizedObject)
|
||
if err != nil {
|
||
// Object doesn't exist - this is OK for delete operations (idempotent)
|
||
glog.V(2).Infof("deleteSpecificObjectVersion: null version object %s already deleted or doesn't exist", normalizedObject)
|
||
return nil
|
||
}
|
||
|
||
// Delete the regular file
|
||
deleteErr := s3a.rmObject(bucketDir, normalizedObject, !metadataOnly, false)
|
||
if deleteErr != nil {
|
||
// Check if file was already deleted by another process
|
||
if _, checkErr := s3a.getEntry(bucketDir, normalizedObject); checkErr != nil {
|
||
// File doesn't exist anymore, deletion was successful
|
||
return nil
|
||
}
|
||
return fmt.Errorf("failed to delete null version %s: %v", normalizedObject, deleteErr)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
versionsDir := s3a.getVersionedObjectDir(bucket, normalizedObject)
|
||
versionFile := s3a.getVersionFileName(versionId)
|
||
|
||
// Check if this is the latest version before attempting deletion (for potential metadata update)
|
||
versionsEntry, dirErr := s3a.getEntry(s3a.bucketDir(bucket), normalizedObject+s3_constants.VersionsFolder)
|
||
isLatestVersion := false
|
||
if dirErr == nil && versionsEntry.Extended != nil {
|
||
if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
|
||
isLatestVersion = (string(latestVersionIdBytes) == versionId)
|
||
}
|
||
}
|
||
|
||
// Option 1: when deleting the current latest, repoint the .versions/
|
||
// pointer BEFORE removing the blob. A failure between the two steps
|
||
// then leaves a recoverable orphan blob (the pointer is already
|
||
// consistent, GETs serve the prior version or NoSuchKey correctly)
|
||
// instead of a dangling pointer (which forces every subsequent GET
|
||
// through the 10-retry self-heal path and returns NoSuchKey for
|
||
// objects whose latest version was singleton).
|
||
var (
|
||
prePointerRolled bool
|
||
preWasSingleton bool
|
||
)
|
||
if isLatestVersion {
|
||
rolled, singleton, prepErr := s3a.repointLatestBeforeDeletion(ctx, bucket, normalizedObject, versionId)
|
||
if prepErr != nil {
|
||
// Surface to the client so they can retry the DELETE. The
|
||
// blob has NOT been removed yet, so retrying is safe.
|
||
return fmt.Errorf("failed to repoint latest version before deleting %s: %w", versionId, prepErr)
|
||
}
|
||
prePointerRolled = rolled
|
||
preWasSingleton = singleton
|
||
}
|
||
|
||
// Attempt to delete the version file
|
||
// Note: We don't check if the file exists first to avoid race conditions
|
||
// The deletion operation should be idempotent
|
||
deleteErr := s3a.rm(versionsDir, versionFile, !metadataOnly, false)
|
||
if deleteErr != nil {
|
||
// Check if file was already deleted by another process (race condition handling)
|
||
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
|
||
// File doesn't exist anymore, deletion was successful (another thread deleted it)
|
||
glog.V(2).Infof("deleteSpecificObjectVersion: version %s for %s/%s already deleted by another process", versionId, bucket, object)
|
||
return nil
|
||
}
|
||
// File still exists but deletion failed for another reason
|
||
return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
|
||
}
|
||
|
||
switch {
|
||
case isLatestVersion && prePointerRolled && preWasSingleton:
|
||
// Pre-roll cleared a singleton pointer. The blob is now gone,
|
||
// so the .versions/ directory should be empty — try to tear it
|
||
// down. Non-recursive: any orphan from older code paths leaves
|
||
// the directory in place for the empty-folder cleaner or our
|
||
// reconciler to handle.
|
||
if rmErr := s3a.rm(s3a.bucketDir(bucket), normalizedObject+s3_constants.VersionsFolder, true, false); rmErr != nil {
|
||
glog.V(2).Infof("deleteSpecificObjectVersion: deferring .versions/ teardown for %s/%s: %v", bucket, normalizedObject, rmErr)
|
||
}
|
||
case isLatestVersion && !prePointerRolled:
|
||
// Multi-version case where another version still exists, or the
|
||
// pre-step decided the pointer was no longer ours to roll. Run
|
||
// the post-deletion reconciliation: it updates the pointer to
|
||
// the new latest and tears down .versions/ when nothing remains.
|
||
if err := s3a.updateLatestVersionAfterDeletion(ctx, bucket, normalizedObject); err != nil {
|
||
// Option 2: surface this so the operator sees the load-bearing
|
||
// failure, and queue the path for the reconciler to retry off
|
||
// the hot path. The blob delete already succeeded, so we don't
|
||
// want to fail the client request (Veeam et al. treat 5xx on
|
||
// DELETE as a hard storage error) — but we MUST drive the
|
||
// pointer to consistency, otherwise the next read pays the
|
||
// self-heal cost.
|
||
versioningHealErrorf("produced", "bucket=%s key=%s reason=update_after_delete_failed err=%v queued_for_reconciler=true", bucket, normalizedObject, err)
|
||
if s3a.versionsHealQueue != nil {
|
||
s3a.versionsHealQueue.Enqueue(bucket, normalizedObject)
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// repointLatestBeforeDeletion is the "option 1" pre-step for deleting a
|
||
// version that the .versions/ pointer currently names. It scans the
|
||
// directory to find the next-newest version excluding the one about to
|
||
// be deleted, and either repoints to it (multi-version case) or clears
|
||
// the pointer entirely (single-version case). When this returns rolled
|
||
// = true the caller should NOT run updateLatestVersionAfterDeletion
|
||
// after the blob rm — the pointer is already consistent.
|
||
//
|
||
// wasSingleton = true indicates the caller cleared a singleton pointer.
|
||
// After the caller's blob rm completes, deleteSpecificObjectVersion is
|
||
// expected to run the post-deletion .versions/ teardown so the now-empty
|
||
// directory entry is removed; doing that teardown inside this function
|
||
// (i.e. before the blob rm) always fails with "non-empty folder" because
|
||
// the blob is still present.
|
||
//
|
||
// When this returns rolled = false the .versions/ pointer was not in
|
||
// sync with the caller's view (some concurrent writer changed it) and
|
||
// the current deletion is no longer touching the latest; the caller
|
||
// proceeds with the historical multi-step path which will re-snapshot
|
||
// the state inside updateLatestVersionAfterDeletion.
|
||
//
|
||
// All load-bearing filer ops here are wrapped in retryFilerOp so the
|
||
// pre-roll matches the resilience of the post-roll path; without this,
|
||
// a transient filer hiccup during the pre-step would cause the caller
|
||
// to fall back to the legacy non-atomic flow.
|
||
func (s3a *S3ApiServer) repointLatestBeforeDeletion(ctx context.Context, bucket, normalizedObject, versionIdToDelete string) (rolled bool, wasSingleton bool, err error) {
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
|
||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||
|
||
// Find the chronologically newest *other* version (excluding the
|
||
// one we're about to delete). For singleton objects this returns
|
||
// nil; for multi-version it returns the previous version.
|
||
var (
|
||
newLatestEntry *filer_pb.Entry
|
||
newLatestVersionId string
|
||
newLatestVersionFile string
|
||
newLatestIsDeleteMark bool
|
||
startFrom string
|
||
)
|
||
for {
|
||
var (
|
||
entries []*filer_pb.Entry
|
||
isLast bool
|
||
)
|
||
if listErr := retryFilerOp(ctx, "repointLatestBeforeDeletion.list", func() error {
|
||
var lerr error
|
||
entries, isLast, lerr = s3a.list(versionsDir, "", startFrom, false, filer.PaginationSize)
|
||
return lerr
|
||
}); listErr != nil {
|
||
return false, false, fmt.Errorf("list %s: %w", versionsDir, listErr)
|
||
}
|
||
for _, entry := range entries {
|
||
if entry == nil || entry.Extended == nil {
|
||
continue
|
||
}
|
||
vidBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]
|
||
if !ok {
|
||
continue
|
||
}
|
||
vid := string(vidBytes)
|
||
if vid == versionIdToDelete {
|
||
continue
|
||
}
|
||
if newLatestVersionId == "" || compareVersionIds(vid, newLatestVersionId) < 0 {
|
||
newLatestEntry = entry
|
||
newLatestVersionId = vid
|
||
newLatestVersionFile = entry.Name
|
||
newLatestIsDeleteMark = string(entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true"
|
||
}
|
||
}
|
||
if isLast || len(entries) == 0 {
|
||
break
|
||
}
|
||
startFrom = entries[len(entries)-1].Name
|
||
}
|
||
|
||
// Re-fetch the .versions/ entry so the CAS persist below sees the
|
||
// most recent Extended state. Distinguish NotFound (directory
|
||
// already gone — vacuous consistency, skip the post-step) from
|
||
// transient errors (must surface so the caller aborts before the
|
||
// blob rm — otherwise we'd remove the blob without having updated
|
||
// the pointer, reproducing the very dangling-state this PR fixes).
|
||
var versionsEntry *filer_pb.Entry
|
||
if getErr := retryFilerOp(ctx, "repointLatestBeforeDeletion.getEntry", func() error {
|
||
var gerr error
|
||
versionsEntry, gerr = s3a.getEntry(bucketDir, versionsObjectPath)
|
||
return gerr
|
||
}); getErr != nil {
|
||
if errors.Is(getErr, filer_pb.ErrNotFound) || status.Code(getErr) == codes.NotFound {
|
||
// Directory already gone. Pointer is vacuously consistent.
|
||
return true, false, nil
|
||
}
|
||
return false, false, fmt.Errorf("read .versions entry: %w", getErr)
|
||
}
|
||
if versionsEntry.Extended == nil {
|
||
return false, false, nil
|
||
}
|
||
currentLatestIdBytes, hasCurrent := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
|
||
if !hasCurrent || string(currentLatestIdBytes) != versionIdToDelete {
|
||
// A concurrent writer already moved the pointer; our delete is
|
||
// no longer the latest. Caller falls back to the existing path.
|
||
return false, false, nil
|
||
}
|
||
|
||
if newLatestEntry != nil {
|
||
// Multi-version: repoint to the prior latest.
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(newLatestVersionId)
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(newLatestVersionFile)
|
||
setCachedListMetadata(versionsEntry, newLatestEntry)
|
||
glog.V(2).Infof("repointLatestBeforeDeletion: %s/%s pre-roll to %s (deleteMarker=%v) before deleting %s", bucket, normalizedObject, newLatestVersionId, newLatestIsDeleteMark, versionIdToDelete)
|
||
} else {
|
||
// Singleton: clear the pointer fields. The blob rm and the
|
||
// post-rm .versions/ teardown follow in the caller; if those
|
||
// fail halfway through, the pointer is already absent so
|
||
// reads fall through to a clean NoSuchKey without entering the
|
||
// 10-retry stale-pointer self-heal path.
|
||
delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
|
||
delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
|
||
clearCachedVersionMetadata(versionsEntry.Extended)
|
||
glog.V(2).Infof("repointLatestBeforeDeletion: %s/%s clearing singleton pointer before deleting %s", bucket, normalizedObject, versionIdToDelete)
|
||
}
|
||
|
||
if mkErr := retryFilerOp(ctx, "repointLatestBeforeDeletion.mkFile", func() error {
|
||
return s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||
updatedEntry.Extended = versionsEntry.Extended
|
||
updatedEntry.Attributes = versionsEntry.Attributes
|
||
updatedEntry.Chunks = versionsEntry.Chunks
|
||
})
|
||
}); mkErr != nil {
|
||
return false, false, fmt.Errorf("persist repointed pointer: %w", mkErr)
|
||
}
|
||
|
||
return true, newLatestEntry == nil, nil
|
||
}
|
||
|
||
// retryAttempts and retryStep tune the bounded retries used when the
|
||
// load-bearing filer ops in updateLatestVersionAfterDeletion fail with
|
||
// transient errors. Doubled per attempt, capped at retryCap. Total
|
||
// worst-case wall time ≈ 6.3s before propagating.
|
||
const (
|
||
updateLatestRetryAttempts = 6
|
||
updateLatestRetryStep = 100 * time.Millisecond
|
||
updateLatestRetryCap = 2 * time.Second
|
||
)
|
||
|
||
// isRetryableFilerErr reports whether err is worth retrying through
|
||
// retryFilerOp. Terminal conditions return false so the caller surfaces
|
||
// them immediately without the backoff delay or the retry-budget
|
||
// wrapper:
|
||
//
|
||
// - NotFound: the entry genuinely doesn't exist. Retrying won't make
|
||
// it appear, and callers (e.g. repointLatestBeforeDeletion) want
|
||
// to act on this directly.
|
||
// - context.Canceled / DeadlineExceeded: the request was aborted by
|
||
// the client or hit a deadline. Continuing to retry just delays
|
||
// the failure return.
|
||
//
|
||
// Everything else (gRPC Unavailable, transient network errors, filer
|
||
// overload signals, etc.) is treated as retryable.
|
||
func isRetryableFilerErr(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
if errors.Is(err, filer_pb.ErrNotFound) || status.Code(err) == codes.NotFound {
|
||
return false
|
||
}
|
||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||
return false
|
||
}
|
||
return true
|
||
}
|
||
|
||
func retryFilerOp(ctx context.Context, name string, fn func() error) error {
|
||
var lastErr error
|
||
backoff := updateLatestRetryStep
|
||
for attempt := 1; attempt <= updateLatestRetryAttempts; attempt++ {
|
||
err := fn()
|
||
if err == nil {
|
||
if attempt > 1 {
|
||
glog.V(1).Infof("retryFilerOp: %s succeeded on attempt %d", name, attempt)
|
||
}
|
||
return nil
|
||
}
|
||
if !isRetryableFilerErr(err) {
|
||
// Terminal — return raw so callers can errors.Is /
|
||
// status.Code on the unwrapped error and avoid the
|
||
// retry-budget delay.
|
||
return err
|
||
}
|
||
lastErr = err
|
||
if attempt == updateLatestRetryAttempts {
|
||
break
|
||
}
|
||
// Context-aware backoff so a server shutdown / client
|
||
// disconnect cancels the worst-case ~6.3s retry budget
|
||
// immediately instead of blocking the goroutine.
|
||
timer := time.NewTimer(backoff)
|
||
select {
|
||
case <-ctx.Done():
|
||
timer.Stop()
|
||
return ctx.Err()
|
||
case <-timer.C:
|
||
}
|
||
backoff *= 2
|
||
if backoff > updateLatestRetryCap {
|
||
backoff = updateLatestRetryCap
|
||
}
|
||
}
|
||
return fmt.Errorf("%s exhausted %d retries: %w", name, updateLatestRetryAttempts, lastErr)
|
||
}
|
||
|
||
// updateLatestVersionAfterDeletion finds the new latest version after deleting
|
||
// the current latest. The pointer may refer to a delete marker: if a delete
|
||
// marker is chronologically newer than the most recent remaining content
|
||
// version, S3 semantics treat the object as deleted and the pointer must
|
||
// reflect that. Restricting the scan to content versions here would resurrect
|
||
// the object by promoting an older content version over a newer delete marker.
|
||
//
|
||
// All load-bearing filer interactions (list, getEntry, mkFile, rm) are
|
||
// retried with bounded backoff. The function now returns a non-nil error
|
||
// when those retries are exhausted; the caller is expected to surface the
|
||
// failure (log + enqueue for the reconciler) instead of swallowing it,
|
||
// which was the historic behaviour that left dangling pointers in place.
|
||
func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(ctx context.Context, bucket, object string) error {
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsObjectPath := object + s3_constants.VersionsFolder
|
||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||
|
||
glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
|
||
|
||
// Paginate through all remaining entries and keep a running best candidate.
|
||
// A single-shot list would miss the true latest for old-format (raw
|
||
// timestamp) version ids when the directory exceeds one page, since filer
|
||
// order is lexicographic-ascending = oldest-first for that format.
|
||
//
|
||
// orphanSamples captures up to orphanSampleCap names that DID appear in
|
||
// the listing but lacked the version-id extended attribute. These are
|
||
// the smoking-gun diagnostic for the "scanned N>0 but no valid latest"
|
||
// anomaly — they're either filer-listing stale entries (just-deleted
|
||
// records still indexed) or residual stubs from interrupted writes.
|
||
const orphanSampleCap = 8
|
||
var (
|
||
latestVersionEntry *filer_pb.Entry
|
||
latestVersionId string
|
||
latestVersionFileName string
|
||
latestIsDeleteMarker bool
|
||
totalEntries int
|
||
orphanCount int
|
||
orphanSamples []string
|
||
startFrom string
|
||
)
|
||
for {
|
||
var (
|
||
entries []*filer_pb.Entry
|
||
isLast bool
|
||
)
|
||
if err := retryFilerOp(ctx, "updateLatestVersionAfterDeletion.list", func() error {
|
||
var listErr error
|
||
entries, isLast, listErr = s3a.list(versionsDir, "", startFrom, false, filer.PaginationSize)
|
||
return listErr
|
||
}); err != nil {
|
||
glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
|
||
return fmt.Errorf("failed to list versions: %w", err)
|
||
}
|
||
totalEntries += len(entries)
|
||
if pageEntry, pageId, pageFile, pageDM := selectLatestVersion(entries); pageEntry != nil {
|
||
if latestVersionEntry == nil || compareVersionIds(pageId, latestVersionId) < 0 {
|
||
latestVersionEntry = pageEntry
|
||
latestVersionId = pageId
|
||
latestVersionFileName = pageFile
|
||
latestIsDeleteMarker = pageDM
|
||
}
|
||
}
|
||
// Sample orphan entries (those without ExtVersionIdKey) for
|
||
// post-scan diagnostics. selectLatestVersion already filters them
|
||
// out for the latest-pick; we collect names separately so the
|
||
// anomaly warning has something concrete to point at.
|
||
for _, e := range entries {
|
||
if e == nil {
|
||
continue
|
||
}
|
||
if e.Extended != nil {
|
||
if _, ok := e.Extended[s3_constants.ExtVersionIdKey]; ok {
|
||
continue
|
||
}
|
||
}
|
||
orphanCount++
|
||
if len(orphanSamples) < orphanSampleCap {
|
||
orphanSamples = append(orphanSamples, e.Name)
|
||
}
|
||
}
|
||
if isLast || len(entries) == 0 {
|
||
break
|
||
}
|
||
startFrom = entries[len(entries)-1].Name
|
||
}
|
||
|
||
if totalEntries > 0 && latestVersionEntry == nil {
|
||
// The scan saw entries but none were valid version blobs. This is
|
||
// the listing-after-rm timing anomaly: a just-deleted record is
|
||
// still indexed in the parent's listing but its extended attrs
|
||
// either weren't loaded or the entry itself is mid-removal. The
|
||
// caller will now fall through to the .versions/ teardown +
|
||
// pointer clear, but operators tracking stranded-state production
|
||
// should see this event.
|
||
samplesSummary := "(none)"
|
||
if len(orphanSamples) > 0 {
|
||
samplesSummary = strings.Join(orphanSamples, ",")
|
||
}
|
||
versioningHealWarningf("anomaly", "bucket=%s key=%s scanned=%d orphan_count=%d orphan_samples=%s reason=listing_has_entries_but_none_have_version_id", bucket, object, totalEntries, orphanCount, samplesSummary)
|
||
} else {
|
||
glog.V(1).Infof("updateLatestVersionAfterDeletion: scanned %d entries in %s", totalEntries, versionsDir)
|
||
}
|
||
|
||
// Update the .versions directory metadata
|
||
var versionsEntry *filer_pb.Entry
|
||
if err := retryFilerOp(ctx, "updateLatestVersionAfterDeletion.getEntry", func() error {
|
||
var getErr error
|
||
versionsEntry, getErr = s3a.getEntry(bucketDir, versionsObjectPath)
|
||
return getErr
|
||
}); err != nil {
|
||
return fmt.Errorf("failed to get .versions directory: %w", err)
|
||
}
|
||
|
||
if versionsEntry.Extended == nil {
|
||
versionsEntry.Extended = make(map[string][]byte)
|
||
}
|
||
|
||
if latestVersionEntry != nil {
|
||
// Update metadata to point at the new latest (content version or
|
||
// delete marker — whichever is chronologically newest).
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
|
||
setCachedListMetadata(versionsEntry, latestVersionEntry)
|
||
|
||
glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s (deleteMarker=%v)", bucket, object, latestVersionId, latestIsDeleteMarker)
|
||
if err := retryFilerOp(ctx, "updateLatestVersionAfterDeletion.mkFile", func() error {
|
||
return s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||
updatedEntry.Extended = versionsEntry.Extended
|
||
updatedEntry.Attributes = versionsEntry.Attributes
|
||
updatedEntry.Chunks = versionsEntry.Chunks
|
||
})
|
||
}); err != nil {
|
||
return fmt.Errorf("failed to update .versions directory metadata: %w", err)
|
||
}
|
||
} else {
|
||
// No version-tagged entries remain - try to delete the .versions
|
||
// directory. rm is non-recursive, so any stray non-version entries
|
||
// (orphan files from older code paths or interrupted writes that left
|
||
// behind a v_<id> file without the version-id extended attribute)
|
||
// will cause rm to fail.
|
||
//
|
||
// In that case, clear the stale latest-version pointer on the
|
||
// .versions directory entry so subsequent reads return a clean
|
||
// "object absent" instead of repeatedly chasing a missing version
|
||
// file through getLatestObjectVersion's retry loop and the self-heal
|
||
// path on every request. The orphan files remain in the directory
|
||
// (an operator can remove them); from the S3 API perspective, the
|
||
// object is correctly absent.
|
||
glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s, deleting .versions directory", bucket, object)
|
||
|
||
rmErr := s3a.rm(bucketDir, versionsObjectPath, true, false)
|
||
if rmErr == nil {
|
||
return nil
|
||
}
|
||
// Two ways rm can fail here: "non-empty folder" (orphan entries
|
||
// blocking the teardown — fall through to pointer clear) and a
|
||
// transient filer error (worth retrying). Distinguish by the
|
||
// canonical error substring; if we can't tell, treat as transient.
|
||
if strings.Contains(rmErr.Error(), filer.MsgFailDelNonEmptyFolder) {
|
||
glog.V(2).Infof("updateLatestVersionAfterDeletion: .versions/ for %s/%s still has orphan entries: %v", bucket, object, rmErr)
|
||
s3a.clearStaleLatestVersionPointer(bucket, object, bucketDir, versionsObjectPath, versionsEntry, "updateLatestVersionAfterDeletion")
|
||
return nil
|
||
}
|
||
// Transient — retry the rm a few times before giving up. Even
|
||
// if it ultimately fails, we still clear the stale pointer so
|
||
// readers get a clean miss; the directory can be tidied by the
|
||
// reconciler later.
|
||
retryErr := retryFilerOp(ctx, "updateLatestVersionAfterDeletion.rm", func() error {
|
||
return s3a.rm(bucketDir, versionsObjectPath, true, false)
|
||
})
|
||
if retryErr == nil {
|
||
return nil
|
||
}
|
||
if strings.Contains(retryErr.Error(), filer.MsgFailDelNonEmptyFolder) {
|
||
s3a.clearStaleLatestVersionPointer(bucket, object, bucketDir, versionsObjectPath, versionsEntry, "updateLatestVersionAfterDeletion")
|
||
return nil
|
||
}
|
||
versioningHealWarningf("teardown_failed", "bucket=%s key=%s err=%v (fell through to clearStale)", bucket, object, retryErr)
|
||
if s3a.clearStaleLatestVersionPointer(bucket, object, bucketDir, versionsObjectPath, versionsEntry, "updateLatestVersionAfterDeletion") {
|
||
// Pointer is consistent again; reader will get NoSuchKey via
|
||
// the clean-miss path. Don't emit `produced` or enqueue the
|
||
// reconciler — there's no stranded state left to heal.
|
||
return nil
|
||
}
|
||
return fmt.Errorf("delete .versions directory: %w", retryErr)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// clearStaleLatestVersionPointer best-effort clears the latest-version
|
||
// pointer on a .versions directory entry when no recoverable version
|
||
// remains and the directory cannot be removed (e.g. because of orphan
|
||
// entries that lack the version-id extended attribute).
|
||
//
|
||
// The persist is CAS-style to avoid wiping a pointer that a concurrent
|
||
// writer just promoted between the caller's snapshot and this function:
|
||
//
|
||
// 1. Re-scan .versions for any version-id-tagged entry. If one now
|
||
// exists, abort - either the concurrent writer already updated the
|
||
// pointer or the next read's self-heal will pick up the new entry.
|
||
// 2. Re-fetch the live .versions directory entry and require its
|
||
// latest-pointer fields to still match the stale id observed by the
|
||
// caller. If they have changed, abort.
|
||
// 3. Persist with mkFile using the live Extended map (with the two
|
||
// pointer fields and cached metadata removed) so any other Extended
|
||
// fields written concurrently between (2) and the persist are
|
||
// preserved.
|
||
//
|
||
// caller is the source-function name used in log lines so operators can
|
||
// trace which path ran the clear.
|
||
//
|
||
// Returns cleared=true ONLY when this function successfully removed the
|
||
// pointer (or the second branch — pointer no longer present — left the
|
||
// directory in the intended clean-miss state, which counts as
|
||
// already-clear). Concurrent-writer aborts, re-scan errors, and CAS
|
||
// mismatches return false so callers that hit a transient teardown
|
||
// failure still emit `produced` and enqueue for the reconciler; a
|
||
// successful clear lets the caller short-circuit and return nil since
|
||
// the pointer is consistent and the next reader gets NoSuchKey via the
|
||
// clean-miss path.
|
||
func (s3a *S3ApiServer) clearStaleLatestVersionPointer(bucket, object, bucketDir, versionsObjectPath string, versionsEntry *filer_pb.Entry, caller string) (cleared bool) {
|
||
if versionsEntry == nil || versionsEntry.Extended == nil {
|
||
return false
|
||
}
|
||
observedStaleId := string(versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey])
|
||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||
|
||
startFrom := ""
|
||
for {
|
||
entries, isLast, listErr := s3a.list(versionsDir, "", startFrom, false, filer.PaginationSize)
|
||
if listErr != nil {
|
||
glog.Warningf("%s: re-scan failed for %s/%s, leaving pointer untouched: %v", caller, bucket, object, listErr)
|
||
return false
|
||
}
|
||
if pageEntry, _, _, _ := selectLatestVersion(entries); pageEntry != nil {
|
||
glog.V(1).Infof("%s: skipping pointer clear for %s/%s, concurrent writer added a tagged version", caller, bucket, object)
|
||
return false
|
||
}
|
||
if isLast || len(entries) == 0 {
|
||
break
|
||
}
|
||
startFrom = entries[len(entries)-1].Name
|
||
}
|
||
|
||
liveEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
|
||
if err != nil {
|
||
// Directory was concurrently removed - reader will get NoSuchKey
|
||
// via the clean-miss path; pointer is effectively cleared.
|
||
return true
|
||
}
|
||
if liveEntry.Extended == nil {
|
||
return true
|
||
}
|
||
currentIdBytes, hasId := liveEntry.Extended[s3_constants.ExtLatestVersionIdKey]
|
||
_, hasFile := liveEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
|
||
if !hasId && !hasFile {
|
||
// Already cleared by another path.
|
||
return true
|
||
}
|
||
if observedStaleId != "" && string(currentIdBytes) != observedStaleId {
|
||
glog.V(1).Infof("%s: skipping pointer clear for %s/%s, live pointer changed (observed=%s, current=%s)", caller, bucket, object, observedStaleId, string(currentIdBytes))
|
||
return false
|
||
}
|
||
|
||
delete(liveEntry.Extended, s3_constants.ExtLatestVersionIdKey)
|
||
delete(liveEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
|
||
clearCachedVersionMetadata(liveEntry.Extended)
|
||
if mkErr := s3a.mkFile(bucketDir, versionsObjectPath, liveEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||
updatedEntry.Extended = liveEntry.Extended
|
||
updatedEntry.Attributes = liveEntry.Attributes
|
||
updatedEntry.Chunks = liveEntry.Chunks
|
||
}); mkErr != nil {
|
||
versioningHealWarningf("clear_failed", "bucket=%s key=%s caller=%s err=%v", bucket, object, caller, mkErr)
|
||
return false
|
||
}
|
||
versioningHealInfof("healed", "bucket=%s key=%s mode=pointer_cleared caller=%s (orphan entries remain in .versions directory)", bucket, object, caller)
|
||
return true
|
||
}
|
||
|
||
// ListObjectVersionsHandler handles the list object versions request
|
||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
|
||
func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
||
bucket, _ := s3_constants.GetBucketAndObject(r)
|
||
glog.V(3).Infof("ListObjectVersionsHandler %s", bucket)
|
||
|
||
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
||
s3err.WriteErrorResponse(w, r, err)
|
||
return
|
||
}
|
||
|
||
// Parse query parameters
|
||
query := r.URL.Query()
|
||
originalPrefix := query.Get("prefix") // Keep original prefix for response
|
||
prefix := strings.TrimPrefix(originalPrefix, "/")
|
||
// Note: prefix is used for filtering relative to bucket root, so no leading slash needed
|
||
|
||
keyMarker := query.Get("key-marker")
|
||
versionIdMarker := query.Get("version-id-marker")
|
||
delimiter := query.Get("delimiter")
|
||
|
||
maxKeysStr := query.Get("max-keys")
|
||
maxKeys := 1000
|
||
if maxKeysStr != "" {
|
||
if mk, err := strconv.Atoi(maxKeysStr); err == nil && mk > 0 {
|
||
maxKeys = mk
|
||
}
|
||
}
|
||
|
||
// List versions
|
||
result, err := s3a.listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys)
|
||
if err != nil {
|
||
glog.Errorf("ListObjectVersionsHandler: %v", err)
|
||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||
return
|
||
}
|
||
|
||
// Set the original prefix in the response (not the normalized internal prefix)
|
||
result.Prefix = originalPrefix
|
||
|
||
glog.V(3).Infof("ListObjectVersionsHandler response: %+v", result)
|
||
writeSuccessResponseXML(w, r, result)
|
||
}
|
||
|
||
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
|
||
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
|
||
return s3a.doGetLatestObjectVersion(bucket, object, 8)
|
||
}
|
||
|
||
func (s3a *S3ApiServer) doGetLatestObjectVersion(bucket, object string, maxRetries int) (*filer_pb.Entry, error) {
|
||
// Normalize object path to ensure consistency with toFilerPath behavior
|
||
normalizedObject := s3_constants.NormalizeObjectKey(object)
|
||
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
|
||
|
||
glog.V(1).Infof("doGetLatestObjectVersion: looking for latest version of %s/%s (normalized: %s, retries: %d)", bucket, object, normalizedObject, maxRetries)
|
||
|
||
// Get the .versions directory entry to read latest version metadata with retry logic for filer consistency
|
||
var versionsEntry *filer_pb.Entry
|
||
var err error
|
||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||
versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath)
|
||
if err == nil {
|
||
break
|
||
}
|
||
|
||
if attempt < maxRetries {
|
||
// Exponential backoff with higher base: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 6400ms
|
||
delay := time.Millisecond * time.Duration(100*(1<<(attempt-1)))
|
||
time.Sleep(delay)
|
||
}
|
||
}
|
||
|
||
if err != nil {
|
||
// .versions directory doesn't exist - this can happen for objects that existed
|
||
// before versioning was enabled on the bucket. Fall back to checking for a
|
||
// regular (non-versioned) object file.
|
||
glog.V(1).Infof("getLatestObjectVersion: no .versions directory for %s/%s after %d attempts (error: %v), checking for pre-versioning object", bucket, normalizedObject, maxRetries, err)
|
||
|
||
regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
|
||
if regularErr != nil {
|
||
glog.V(1).Infof("getLatestObjectVersion: no pre-versioning object found for %s/%s (error: %v)", bucket, normalizedObject, regularErr)
|
||
return nil, fmt.Errorf("failed to get %s/%s .versions directory and no regular object found: %w", bucket, normalizedObject, err)
|
||
}
|
||
|
||
glog.V(1).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, normalizedObject)
|
||
return regularEntry, nil
|
||
}
|
||
|
||
// Check if directory has latest version metadata - retry if missing due to race condition
|
||
if versionsEntry.Extended == nil {
|
||
// Retry a few times to handle the race condition where directory exists but metadata is not yet written
|
||
metadataRetries := 3
|
||
for metaAttempt := 1; metaAttempt <= metadataRetries; metaAttempt++ {
|
||
// Small delay and re-read the directory
|
||
time.Sleep(time.Millisecond * 100)
|
||
versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath)
|
||
if err != nil {
|
||
break
|
||
}
|
||
|
||
if versionsEntry.Extended != nil {
|
||
break
|
||
}
|
||
}
|
||
|
||
// If still no metadata after retries, fall back to pre-versioning object
|
||
if versionsEntry.Extended == nil {
|
||
glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s/%s after retries, checking for pre-versioning object", bucket, object)
|
||
|
||
regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
|
||
if regularErr != nil {
|
||
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, normalizedObject)
|
||
}
|
||
|
||
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s (no Extended metadata case)", bucket, object)
|
||
return regularEntry, nil
|
||
}
|
||
}
|
||
|
||
latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
|
||
latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
|
||
|
||
if !hasLatestVersionId || !hasLatestVersionFile {
|
||
// No version metadata means all versioned objects have been deleted.
|
||
// Fall back to checking for a pre-versioning object.
|
||
glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
|
||
|
||
regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
|
||
if regularErr != nil {
|
||
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, normalizedObject)
|
||
}
|
||
|
||
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s after version deletion", bucket, object)
|
||
return regularEntry, nil
|
||
}
|
||
|
||
latestVersionId := string(latestVersionIdBytes)
|
||
latestVersionFile := string(latestVersionFileBytes)
|
||
|
||
glog.V(2).Infof("getLatestObjectVersion: found latest version %s (file: %s) for %s/%s", latestVersionId, latestVersionFile, bucket, object)
|
||
|
||
// Get the actual latest version file entry
|
||
latestVersionPath := versionsObjectPath + "/" + latestVersionFile
|
||
latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath)
|
||
if err != nil {
|
||
// The pointer refers to a version file that no longer exists. Rather than
|
||
// surfacing a hard error that requires manual repair, rescan the .versions
|
||
// directory and self-heal the pointer to whatever remains.
|
||
if errors.Is(err, filer_pb.ErrNotFound) || status.Code(err) == codes.NotFound {
|
||
healed, healErr := s3a.healStaleLatestVersionPointer(bucket, normalizedObject, versionsEntry, latestVersionFile)
|
||
if healErr == nil {
|
||
return healed, nil
|
||
}
|
||
return nil, fmt.Errorf("stale latest-version pointer for %s/%s (file %s) could not self-heal: %w", bucket, normalizedObject, latestVersionFile, healErr)
|
||
}
|
||
return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err)
|
||
}
|
||
|
||
return latestVersionEntry, nil
|
||
}
|
||
|
||
// selectLatestVersion returns the chronologically newest entry with a version
|
||
// id, including delete markers. isDeleteMarker reflects whether the selected
|
||
// entry is a delete marker. Returns nil for latestEntry when the directory
|
||
// contains no version-id-tagged entries.
|
||
//
|
||
// This is the correct selector for the self-heal path: the .versions pointer
|
||
// tracks the current-version-regardless-of-type (see createDeleteMarker), and
|
||
// promoting a delete marker keeps S3 semantics — downstream handlers observe
|
||
// ExtDeleteMarkerKey on the returned entry and respond with NoSuchKey.
|
||
func selectLatestVersion(entries []*filer_pb.Entry) (latestEntry *filer_pb.Entry, latestVersionId, latestVersionFileName string, isDeleteMarker bool) {
|
||
for _, entry := range entries {
|
||
if entry == nil || entry.Extended == nil {
|
||
continue
|
||
}
|
||
|
||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||
if !hasVersionId {
|
||
continue
|
||
}
|
||
|
||
versionId := string(versionIdBytes)
|
||
// compareVersionIds returns negative when the first arg is newer
|
||
if latestVersionId == "" || compareVersionIds(versionId, latestVersionId) < 0 {
|
||
latestVersionId = versionId
|
||
latestVersionFileName = entry.Name
|
||
latestEntry = entry
|
||
isDeleteMarker = string(entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true"
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// healStaleLatestVersionPointer is invoked when the .versions directory metadata
|
||
// points to a version file that no longer exists. It paginates the directory,
|
||
// picks the chronologically newest remaining entry (content version or delete
|
||
// marker), updates the directory pointer metadata best-effort, and returns the
|
||
// rescanned entry. Downstream handlers detect ExtDeleteMarkerKey on the
|
||
// returned entry and render NoSuchKey, so promoting a delete marker preserves
|
||
// correct S3 semantics. If no version-tagged entry remains an error is
|
||
// returned and the caller surfaces it as not found.
|
||
func (s3a *S3ApiServer) healStaleLatestVersionPointer(bucket, normalizedObject string, versionsEntry *filer_pb.Entry, stalePointerFile string) (*filer_pb.Entry, error) {
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
|
||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||
|
||
versioningHealWarningf("surfaced", "bucket=%s key=%s missing_file=%s rescanning=%s", bucket, normalizedObject, stalePointerFile, versionsDir)
|
||
|
||
// Paginate through all version entries and keep a running best candidate.
|
||
// A single-shot list would miss the true latest when old-format (raw
|
||
// timestamp) version ids spill past one page, since filesystem order is
|
||
// lexicographic-ascending = oldest-first for that format.
|
||
//
|
||
// Pick the chronologically newest entry regardless of type. Promoting a
|
||
// delete marker is correct: S3 semantics treat it as the current version
|
||
// and the caller renders NoSuchKey (with x-amz-delete-marker) from the
|
||
// returned entry. Restricting to content versions here would "undelete"
|
||
// the object by promoting an older content version over a newer marker.
|
||
var (
|
||
latestEntry *filer_pb.Entry
|
||
latestVersionId string
|
||
latestVersionFileName string
|
||
isDeleteMarker bool
|
||
startFrom string
|
||
)
|
||
for {
|
||
entries, isLast, err := s3a.list(versionsDir, "", startFrom, false, filer.PaginationSize)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("list %s: %w", versionsDir, err)
|
||
}
|
||
if pageEntry, pageId, pageFile, pageDM := selectLatestVersion(entries); pageEntry != nil {
|
||
if latestEntry == nil || compareVersionIds(pageId, latestVersionId) < 0 {
|
||
latestEntry = pageEntry
|
||
latestVersionId = pageId
|
||
latestVersionFileName = pageFile
|
||
isDeleteMarker = pageDM
|
||
}
|
||
}
|
||
if isLast || len(entries) == 0 {
|
||
break
|
||
}
|
||
startFrom = entries[len(entries)-1].Name
|
||
}
|
||
|
||
if latestEntry == nil {
|
||
// Best-effort clear the stale latest-version pointer so subsequent
|
||
// reads short-circuit to ErrNotFound directly instead of replaying
|
||
// getLatestObjectVersion's read-retry loop and re-entering self-heal
|
||
// on every request. Orphan entries (files in .versions/ that lack
|
||
// the version-id extended attribute) remain in place; from the S3
|
||
// API perspective the object is correctly absent.
|
||
s3a.clearStaleLatestVersionPointer(bucket, normalizedObject, bucketDir, versionsObjectPath, versionsEntry, "healStaleLatestVersionPointer")
|
||
// Wrap filer_pb.ErrNotFound so callers can distinguish genuine
|
||
// object-absence (nothing left to promote) from scan failures
|
||
// (I/O errors during list) via errors.Is.
|
||
return nil, fmt.Errorf("%w: no remaining version in %s", filer_pb.ErrNotFound, versionsDir)
|
||
}
|
||
|
||
if versionsEntry.Extended == nil {
|
||
versionsEntry.Extended = make(map[string][]byte)
|
||
}
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
|
||
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
|
||
setCachedListMetadata(versionsEntry, latestEntry)
|
||
|
||
if mkErr := s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
||
updatedEntry.Extended = versionsEntry.Extended
|
||
updatedEntry.Attributes = versionsEntry.Attributes
|
||
updatedEntry.Chunks = versionsEntry.Chunks
|
||
}); mkErr != nil {
|
||
// Persisting the repair is best-effort. Surface a warning but still
|
||
// return the rescanned entry so the read succeeds; a subsequent write
|
||
// on the object will persist a fresh pointer.
|
||
versioningHealWarningf("heal_persist_failed", "bucket=%s key=%s err=%v (returning rescanned entry)", bucket, normalizedObject, mkErr)
|
||
} else {
|
||
versioningHealInfof("healed", "bucket=%s key=%s mode=pointer_repaired new_version=%s file=%s delete_marker=%v", bucket, normalizedObject, latestVersionId, latestVersionFileName, isDeleteMarker)
|
||
}
|
||
return latestEntry, nil
|
||
}
|
||
|
||
// getLatestVersionEntryFromDirectoryEntry creates a logical entry for list operations using cached metadata
|
||
// from the .versions directory entry. This achieves SINGLE-SCAN efficiency - no additional getEntry calls needed.
|
||
//
|
||
// For N versioned objects:
|
||
// - Before: N×1 to N×12 find operations per list
|
||
// - After: 0 extra find operations (all metadata cached in .versions directory)
|
||
//
|
||
// Returns ErrDeleteMarker if the latest version is a delete marker (expected condition, not an error).
|
||
func (s3a *S3ApiServer) getLatestVersionEntryFromDirectoryEntry(bucket, object string, versionsDirEntry *filer_pb.Entry) (*filer_pb.Entry, error) {
|
||
// Defensive nil check
|
||
if versionsDirEntry == nil {
|
||
return nil, fmt.Errorf("nil .versions directory entry")
|
||
}
|
||
|
||
normalizedObject := s3_constants.NormalizeObjectKey(object)
|
||
|
||
// Check if the directory entry has latest version metadata
|
||
if versionsDirEntry.Extended == nil {
|
||
return nil, fmt.Errorf("no Extended metadata in .versions directory entry")
|
||
}
|
||
|
||
latestVersionIdBytes, hasLatestVersionId := versionsDirEntry.Extended[s3_constants.ExtLatestVersionIdKey]
|
||
if !hasLatestVersionId {
|
||
return nil, fmt.Errorf("missing latest version ID metadata in .versions directory entry")
|
||
}
|
||
|
||
// Check if this is a delete marker (should not be shown in regular list)
|
||
if isDeleteMarker, exists := versionsDirEntry.Extended[s3_constants.ExtLatestVersionIsDeleteMarker]; exists && string(isDeleteMarker) == "true" {
|
||
return nil, ErrDeleteMarker
|
||
}
|
||
|
||
latestVersionId := string(latestVersionIdBytes)
|
||
|
||
// Try to use cached metadata for zero-copy list (single-scan efficiency)
|
||
sizeBytes, hasSize := versionsDirEntry.Extended[s3_constants.ExtLatestVersionSizeKey]
|
||
mtimeBytes, hasMtime := versionsDirEntry.Extended[s3_constants.ExtLatestVersionMtimeKey]
|
||
etagBytes, hasEtag := versionsDirEntry.Extended[s3_constants.ExtLatestVersionETagKey]
|
||
|
||
if hasSize && hasMtime && hasEtag {
|
||
size, sizeErr := strconv.ParseUint(string(sizeBytes), 10, 64)
|
||
mtime, mtimeErr := strconv.ParseInt(string(mtimeBytes), 10, 64)
|
||
if sizeErr == nil && mtimeErr == nil {
|
||
// Use cached metadata - no getEntry call needed!
|
||
glog.V(3).Infof("getLatestVersionEntryFromDirectoryEntry: using cached metadata for %s/%s (size=%d, mtime=%d)", bucket, normalizedObject, size, mtime)
|
||
|
||
logicalEntry := &filer_pb.Entry{
|
||
Name: path.Base(normalizedObject),
|
||
IsDirectory: false,
|
||
Attributes: &filer_pb.FuseAttributes{
|
||
FileSize: size,
|
||
Mtime: mtime,
|
||
},
|
||
Extended: map[string][]byte{
|
||
s3_constants.ExtVersionIdKey: []byte(latestVersionId),
|
||
s3_constants.ExtETagKey: etagBytes,
|
||
},
|
||
}
|
||
|
||
// Attempt to parse the ETag and set it as Md5 attribute for compatibility with filer.ETag().
|
||
// This is a partial fix for single-part uploads. Multipart ETags will still use ExtETagKey.
|
||
if len(etagBytes) >= 2 && etagBytes[0] == '"' && etagBytes[len(etagBytes)-1] == '"' {
|
||
unquotedEtag := etagBytes[1 : len(etagBytes)-1]
|
||
if !bytes.Contains(unquotedEtag, []byte("-")) {
|
||
if md5bytes, err := hex.DecodeString(string(unquotedEtag)); err == nil {
|
||
logicalEntry.Attributes.Md5 = md5bytes
|
||
}
|
||
}
|
||
}
|
||
|
||
// Add owner if cached
|
||
if ownerBytes, hasOwner := versionsDirEntry.Extended[s3_constants.ExtLatestVersionOwnerKey]; hasOwner {
|
||
logicalEntry.Extended[s3_constants.ExtAmzOwnerKey] = ownerBytes
|
||
}
|
||
|
||
return logicalEntry, nil
|
||
}
|
||
glog.Warningf("getLatestVersionEntryFromDirectoryEntry: failed to parse cached metadata for %s/%s, falling back. sizeErr:%v, mtimeErr:%v", bucket, normalizedObject, sizeErr, mtimeErr)
|
||
}
|
||
|
||
// Fallback: fetch version file if cached metadata not available (for older versions)
|
||
latestVersionFileBytes, hasLatestVersionFile := versionsDirEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
|
||
if !hasLatestVersionFile {
|
||
return nil, fmt.Errorf("missing latest version file name metadata in .versions directory entry")
|
||
}
|
||
latestVersionFile := string(latestVersionFileBytes)
|
||
|
||
glog.V(3).Infof("getLatestVersionEntryFromDirectoryEntry: fetching version file for %s/%s (no cached metadata)", bucket, normalizedObject)
|
||
|
||
bucketDir := s3a.bucketDir(bucket)
|
||
versionsObjectPath := path.Join(normalizedObject, s3_constants.VersionsFolder)
|
||
latestVersionPath := path.Join(versionsObjectPath, latestVersionFile)
|
||
latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err)
|
||
}
|
||
|
||
// Check if this is a delete marker (should not be shown in regular list)
|
||
if latestVersionEntry.Extended != nil {
|
||
if deleteMarker, exists := latestVersionEntry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
|
||
return nil, ErrDeleteMarker
|
||
}
|
||
}
|
||
|
||
// Create a logical entry that appears at the object path (not the versioned path)
|
||
logicalEntry := &filer_pb.Entry{
|
||
Name: path.Base(normalizedObject),
|
||
IsDirectory: false,
|
||
Attributes: latestVersionEntry.Attributes,
|
||
Extended: latestVersionEntry.Extended,
|
||
Chunks: latestVersionEntry.Chunks,
|
||
}
|
||
|
||
return logicalEntry, nil
|
||
}
|
||
|
||
// getObjectOwnerFromVersion extracts object owner information from version metadata
|
||
func (s3a *S3ApiServer) getObjectOwnerFromVersion(version *ObjectVersion, bucket, objectKey string) CanonicalUser {
|
||
// First try to get owner from the version's OwnerID field (extracted during listing)
|
||
if version.OwnerID != "" {
|
||
ownerDisplayName := s3a.iam.GetAccountNameById(version.OwnerID)
|
||
return CanonicalUser{ID: version.OwnerID, DisplayName: ownerDisplayName}
|
||
}
|
||
|
||
// Fallback: fetch the specific version entry to get the owner
|
||
// This handles cases where OwnerID wasn't populated during listing
|
||
if specificVersionEntry, err := s3a.getSpecificObjectVersion(bucket, objectKey, version.VersionId); err == nil && specificVersionEntry.Extended != nil {
|
||
if ownerBytes, exists := specificVersionEntry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||
ownerId := string(ownerBytes)
|
||
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
|
||
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
|
||
}
|
||
}
|
||
|
||
// Fallback: return anonymous if no owner found
|
||
return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
|
||
}
|
||
|
||
func entryExtended(v *ObjectVersion) map[string][]byte {
|
||
return map[string][]byte{
|
||
s3_constants.AmzStorageClass: []byte(v.StorageClass),
|
||
s3_constants.ExtAmzOwnerKey: []byte(v.OwnerID),
|
||
s3_constants.ExtETagKey: []byte(v.ETag),
|
||
}
|
||
}
|
||
|
||
// getObjectOwnerFromEntry extracts object owner information from a file entry
|
||
func (s3a *S3ApiServer) getObjectOwnerFromEntry(entry *filer_pb.Entry) CanonicalUser {
|
||
if entry != nil && entry.Extended != nil {
|
||
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||
ownerId := string(ownerBytes)
|
||
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
|
||
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
|
||
}
|
||
}
|
||
|
||
// Fallback: return anonymous if no owner found
|
||
return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
|
||
}
|