Files
seaweedfs/weed/shell/command_s3_versions_audit.go
Chris Lu 79859fc21d feat(s3/versioning): grep-able heal logs + scan-anomaly diagnostics + audit cmd (#9468)
* feat(s3/versioning): grep-able heal logs + scan-anomaly diagnostics + audit cmd

Three diagnostic additions on top of #9460, all aimed at making the next
production incident faster to triage than the one we just spent hours on.

1. [versioning-heal] grep prefix on every heal-related log line, with a
   small fixed event vocabulary (produced / surfaced / healed / enqueue /
   drain / retry / gave_up / anomaly / clear_failed / heal_persist_failed
   / teardown_failed / queue_full). One grep gives operators a single
   event stream across the produce-to-drain lifecycle.

2. Escalate the "scanned N>0 entries but no valid latest" case in
   updateLatestVersionAfterDeletion from V(1) Infof to a Warning that
   names the orphan entries it saw. This is the listing-after-rm
   inconsistency signature that pinned down 259064a8's failure — it
   should not be invisible at default log levels.

3. New weed shell command `s3.versions.audit -prefix <path> [-v] [-heal]`
   that walks .versions/ directories under a prefix and reports the
   stranded population. With -heal it clears the latest-version pointer
   in place on stranded directories so subsequent reads return a clean
   NoSuchKey instead of replaying the 10-retry self-heal loop.

* fix(s3/versioning): audit pagination, exclusive categories, ctx-aware retry

Address PR review:

1. s3.versions.audit walked only the first 1024-entry page of each
   .versions/ directory, false-positiving "stranded" on large dirs.
   Loop until the page returns < 1024 entries, advancing startName.

2. clean and orphan-only categories double-counted when a directory
   had no pointer and at least one orphan: incremented both. Make them
   mutually exclusive so report totals sum to versionsDirs.

3. retryFilerOp's worst-case ~6.3s backoff was a bare time.Sleep,
   non-interruptible by ctx. A server shutdown / client disconnect
   would wait out the budget per in-flight delete. Thread ctx through
   deleteSpecificObjectVersion -> repointLatestBeforeDeletion /
   updateLatestVersionAfterDeletion -> retryFilerOp; backoff now uses
   a select{<-ctx.Done(), <-timer.C}. HTTP handlers pass r.Context();
   gRPC lifecycle handlers pass the stream ctx.

   New test pins the behavior: cancelling ctx mid-backoff returns
   ctx.Err() in <500ms instead of blocking ~6.3s.

* fix(s3/versioning): clearStale outcome + escape grep-able log fields

Two coderabbit follow-ups:

1. Successful pointer clear should suppress `produced`.
   updateLatestVersionAfterDeletion's transient-rm fallback called
   clearStaleLatestVersionPointer best-effort, then unconditionally
   returned retryErr. The caller (deleteSpecificObjectVersion) saw the
   error and emitted `event=produced` + enqueued the reconciler, even
   though clearStaleLatestVersionPointer had just driven the pointer to
   consistency and the next reader would get NoSuchKey via the
   clean-miss path. Make clearStaleLatestVersionPointer return cleared
   bool; on success the caller returns nil so neither produced nor the
   reconciler enqueue fires. Concurrent-writer aborts, re-scan errors,
   and CAS mismatches still report false so genuinely stranded state
   keeps surfacing.

2. Escape user-controlled fields in heal log lines.
   versioningHealInfof / Warningf / Errorf interpolated raw bucket /
   key / filename / err text into a single-space-separated line. An S3
   key (or error string from gRPC) containing whitespace, newlines, or
   `event=...` could split one event into multiple tokens and spoof
   fake fields downstream. Sanitize each arg in the helper: safe
   values pass through; anything with whitespace, quotes, control
   chars, or backslashes is replaced with its strconv.Quote form. No
   caller changes — the format strings remain unchanged.

Tests pin both behaviors: sanitization table covers the field
boundary cases; an end-to-end shape test confirms a key containing
`event=spoof` stays inside a single quoted token.
2026-05-13 10:48:58 -07:00

242 lines
7.9 KiB
Go

package shell
import (
"context"
"flag"
"fmt"
"io"
"strings"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func init() {
Commands = append(Commands, &commandS3VersionsAudit{})
}
type commandS3VersionsAudit struct{}
func (c *commandS3VersionsAudit) Name() string {
return "s3.versions.audit"
}
func (c *commandS3VersionsAudit) Help() string {
return `audit .versions/ directories under a prefix for stranded pointer/missing-file state
Walks every entry under the given prefix and, for each directory whose
name ends in ".versions", checks whether its extended-attr latest-version
pointer references a file that actually exists in the directory.
Reports counts for:
- directories scanned
- clean (no pointer, or pointer matches existing file)
- stranded (pointer set but file is missing) — the symptom seen by
Veeam/etc. as "Storage not found" on the next GET
- orphan (directory has files lacking the version-id extended attr,
which the post-delete cleanup path will refuse to rm)
Example:
# Audit a whole bucket
s3.versions.audit -prefix /buckets/mybucket
# Audit a specific client subtree, print each finding
s3.versions.audit -prefix /buckets/mybucket/Veeam/Backup/groupsoftware/Clients/<uuid>/ -v
# Dry run (default) — read-only, prints what would be healed
# Add -heal to clear stranded pointers in place (calls the same path
# the read-side self-heal uses)
s3.versions.audit -prefix /buckets/mybucket -heal
This command is read-only by default. With -heal, it clears the stale
latest-version pointer on stranded directories; the blob is already
gone, so reads then return NoSuchKey via the clean-miss path instead
of replaying the 10-retry self-heal loop on every request.
`
}
func (c *commandS3VersionsAudit) HasTag(CommandTag) bool {
return false
}
func (c *commandS3VersionsAudit) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
cmd := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
prefix := cmd.String("prefix", "", "filer path to audit recursively (e.g. /buckets/mybucket)")
verbose := cmd.Bool("v", false, "print each stranded/orphan directory as it's found")
doHeal := cmd.Bool("heal", false, "clear the latest-version pointer on stranded directories (default: read-only)")
if err := cmd.Parse(args); err != nil {
return err
}
if *prefix == "" {
return fmt.Errorf("-prefix is required")
}
// Counters
var (
dirsScanned uint64
versionsDirs uint64
clean uint64
stranded uint64
orphanOnly uint64
healed uint64
healFailed uint64
)
start := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(ctx, &filerClientWrapper{client: client}, util.FullPath(*prefix), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
atomic.AddUint64(&dirsScanned, 1)
if !entry.IsDirectory {
return nil
}
if !strings.HasSuffix(entry.Name, ".versions") {
return nil
}
atomic.AddUint64(&versionsDirs, 1)
// What does the pointer name?
var pointerFile string
if entry.Extended != nil {
if v, ok := entry.Extended[s3_constants.ExtLatestVersionFileNameKey]; ok {
pointerFile = string(v)
}
}
// List the children to see if the pointer's file exists and to
// count entries without an ExtVersionIdKey (orphans that block
// non-recursive teardown). filer_pb.List returns one 1024-entry
// page; walk all pages so a large .versions/ directory doesn't
// produce a false positive "stranded" report from only seeing
// the first page.
versionsPath := string(parentPath) + "/" + entry.Name
pointerSeen := false
hasOrphan := false
const auditPageSize = 1024
var startName string
for {
pageEntries := 0
var lastEntryName string
lookupErr := filer_pb.List(ctx, &filerClientWrapper{client: client}, versionsPath, "", func(child *filer_pb.Entry, isLast bool) error {
if child == nil {
return nil
}
pageEntries++
lastEntryName = child.Name
hasVersionId := false
if child.Extended != nil {
if _, ok := child.Extended[s3_constants.ExtVersionIdKey]; ok {
hasVersionId = true
}
}
if pointerFile != "" && child.Name == pointerFile {
pointerSeen = true
}
if !hasVersionId {
hasOrphan = true
}
return nil
}, startName, startName != "", auditPageSize)
if lookupErr != nil {
fmt.Fprintf(writer, "list %s: %v\n", versionsPath, lookupErr)
return nil
}
if pageEntries < auditPageSize {
break
}
startName = lastEntryName
}
switch {
case pointerFile == "":
// No pointer set. Orphan-only and clean are now mutually
// exclusive so the final report's category counts sum to
// versionsDirs.
if hasOrphan {
atomic.AddUint64(&orphanOnly, 1)
} else {
atomic.AddUint64(&clean, 1)
}
case pointerSeen:
atomic.AddUint64(&clean, 1)
default:
// Pointer names a file that the listing does NOT contain.
atomic.AddUint64(&stranded, 1)
if *verbose {
fmt.Fprintf(writer, "stranded: %s pointer=%s orphan=%v\n", versionsPath, pointerFile, hasOrphan)
}
if *doHeal {
if err := healStrandedPointer(ctx, client, parentPath, entry); err != nil {
atomic.AddUint64(&healFailed, 1)
fmt.Fprintf(writer, "heal failed: %s: %v\n", versionsPath, err)
} else {
atomic.AddUint64(&healed, 1)
}
}
}
return nil
})
})
elapsed := time.Since(start)
fmt.Fprintf(writer, "audit complete in %s\n", elapsed)
fmt.Fprintf(writer, " total entries scanned : %d\n", dirsScanned)
fmt.Fprintf(writer, " .versions/ directories: %d\n", versionsDirs)
fmt.Fprintf(writer, " clean : %d\n", clean)
fmt.Fprintf(writer, " stranded : %d\n", stranded)
fmt.Fprintf(writer, " orphan-only : %d\n", orphanOnly)
if *doHeal {
fmt.Fprintf(writer, " healed : %d\n", healed)
fmt.Fprintf(writer, " heal failed : %d\n", healFailed)
}
return err
}
// healStrandedPointer clears the latest-version pointer extended attrs
// on a stranded .versions/ directory. The blob the pointer names is
// already gone; clearing the pointer makes subsequent reads return
// NoSuchKey via the clean-miss path instead of replaying the read-side
// self-heal on every request.
func healStrandedPointer(ctx context.Context, client filer_pb.SeaweedFilerClient, parentPath util.FullPath, entry *filer_pb.Entry) error {
if entry.Extended == nil {
return nil
}
delete(entry.Extended, s3_constants.ExtLatestVersionIdKey)
delete(entry.Extended, s3_constants.ExtLatestVersionFileNameKey)
// Also clear the cached list metadata so a stale size/mtime/etag
// can't be served back; those will be repopulated on the next PUT.
delete(entry.Extended, s3_constants.ExtLatestVersionSizeKey)
delete(entry.Extended, s3_constants.ExtLatestVersionMtimeKey)
delete(entry.Extended, s3_constants.ExtLatestVersionETagKey)
delete(entry.Extended, s3_constants.ExtLatestVersionOwnerKey)
delete(entry.Extended, s3_constants.ExtLatestVersionIsDeleteMarker)
_, err := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
Directory: string(parentPath),
Entry: entry,
})
return err
}
// filerClientWrapper adapts a raw SeaweedFilerClient to the
// filer_pb.FilerClient interface that List / TraverseBfs expect.
type filerClientWrapper struct {
client filer_pb.SeaweedFilerClient
}
func (w *filerClientWrapper) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return fn(w.client)
}
func (w *filerClientWrapper) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (w *filerClientWrapper) GetDataCenter() string {
return ""
}