Files
seaweedfs/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
Chris Lu a4753b6a3b S3: delay empty folder cleanup to prevent Spark write failures (#8970)
* S3: delay empty folder cleanup to prevent Spark write failures (#8963)

Empty folders were being cleaned up within seconds, causing Apache Spark
(s3a) writes to fail when temporary directories like _temporary/0/task_xxx/
were briefly empty.

- Increase default cleanup delay from 5s to 2 minutes
- Only process queue items that have individually aged past the delay
  (previously the entire queue was drained once any item triggered)
- Make the delay configurable via filer.toml:
  [filer.options]
  s3.empty_folder_cleanup_delay = "2m"

* test: increase cleanup wait timeout to match 2m delay

The empty folder cleanup delay was increased to 2 minutes, so the
Spark integration test needs to wait longer for temporary directories
to disappear.

* fix: eagerly clean parent directories after empty folder deletion

After deleting an empty folder, immediately try to clean its parent
rather than relying on cascading metadata events that each re-enter
the 2-minute delay queue. This prevents multi-minute waits when
cleaning nested temporary directory trees (e.g. Spark's _temporary
hierarchy with 3+ levels would take 6m+ vs near-instant).

Fixes the CI failure where lingering _temporary parent directories
were not cleaned within the test's 3-minute timeout.
2026-04-07 13:20:59 -07:00

529 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package empty_folder_cleanup
import (
"context"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
DefaultMaxCountCheck = 1000
DefaultCacheExpiry = 5 * time.Minute
DefaultQueueMaxSize = 1000
DefaultQueueMaxAge = 2 * time.Minute
DefaultProcessorSleep = 30 * time.Second // How often to check queue
)
// FilerOperations defines the filer operations needed by EmptyFolderCleaner
type FilerOperations interface {
CountDirectoryEntries(ctx context.Context, dirPath util.FullPath, limit int) (count int, err error)
DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32, ifNotModifiedAfter int64) error
GetEntryAttributes(ctx context.Context, p util.FullPath) (attributes map[string][]byte, err error)
IsDirectoryKeyObject(ctx context.Context, p util.FullPath) (bool, error)
}
// folderState tracks the state of a folder for empty folder cleanup
type folderState struct {
roughCount int // Cached rough count (up to maxCountCheck)
lastAddTime time.Time // Last time an item was added
lastDelTime time.Time // Last time an item was deleted
lastCheck time.Time // Last time we checked the actual count
}
type bucketCleanupPolicyState struct {
autoRemove bool
attrValue string
lastCheck time.Time
}
// EmptyFolderCleaner handles asynchronous cleanup of empty folders
// Each filer owns specific folders via consistent hashing based on the peer filer list
type EmptyFolderCleaner struct {
filer FilerOperations
lockRing *lock_manager.LockRing
host pb.ServerAddress
// Folder state tracking
mu sync.RWMutex
folderCounts map[string]*folderState // Rough count cache
bucketCleanupPolicies map[string]*bucketCleanupPolicyState // bucket path -> cleanup policy cache
// Cleanup queue (thread-safe, has its own lock)
cleanupQueue *CleanupQueue
// Configuration
maxCountCheck int // Max items to count (1000)
cacheExpiry time.Duration // How long to keep cache entries
processorSleep time.Duration // How often processor checks queue
bucketPath string // e.g., "/buckets"
// Control
enabled bool
stopCh chan struct{}
}
// NewEmptyFolderCleaner creates a new EmptyFolderCleaner.
// cleanupDelay controls how long an empty folder must remain in the queue before deletion.
// If zero, DefaultQueueMaxAge is used.
func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string, cleanupDelay time.Duration) *EmptyFolderCleaner {
if cleanupDelay <= 0 {
cleanupDelay = DefaultQueueMaxAge
}
efc := &EmptyFolderCleaner{
filer: filer,
lockRing: lockRing,
host: host,
folderCounts: make(map[string]*folderState),
bucketCleanupPolicies: make(map[string]*bucketCleanupPolicyState),
cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, cleanupDelay),
maxCountCheck: DefaultMaxCountCheck,
cacheExpiry: DefaultCacheExpiry,
processorSleep: DefaultProcessorSleep,
bucketPath: bucketPath,
enabled: true,
stopCh: make(chan struct{}),
}
go efc.cacheEvictionLoop()
go efc.cleanupProcessor()
return efc
}
// SetEnabled enables or disables the cleaner
func (efc *EmptyFolderCleaner) SetEnabled(enabled bool) {
efc.mu.Lock()
defer efc.mu.Unlock()
efc.enabled = enabled
}
// IsEnabled returns whether the cleaner is enabled
func (efc *EmptyFolderCleaner) IsEnabled() bool {
efc.mu.RLock()
defer efc.mu.RUnlock()
return efc.enabled
}
// ownsFolder checks if this filer owns the folder via consistent hashing
func (efc *EmptyFolderCleaner) ownsFolder(folder string) bool {
primary := efc.lockRing.GetPrimary(folder)
if primary == "" {
return true // Single filer case or no servers
}
return primary == efc.host
}
// OnDeleteEvent is called when a file or directory is deleted
// Both file and directory deletions count towards making the parent folder empty
// eventTime is the time when the delete event occurred (for proper ordering)
func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, isDirectory bool, eventTime time.Time) {
// Skip if not under bucket path (must be at least /buckets/<bucket>/...)
if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) {
return
}
// Check if we own this folder
if !efc.ownsFolder(directory) {
glog.V(4).Infof("EmptyFolderCleaner: not owner of %s, skipping", directory)
return
}
efc.mu.Lock()
defer efc.mu.Unlock()
// Check enabled inside lock to avoid race with Stop()
if !efc.enabled {
return
}
glog.V(3).Infof("EmptyFolderCleaner: delete event in %s/%s (isDir=%v)", directory, entryName, isDirectory)
// Update cached count (create entry if needed)
state, exists := efc.folderCounts[directory]
if !exists {
state = &folderState{}
efc.folderCounts[directory] = state
}
if state.roughCount > 0 {
state.roughCount--
}
state.lastDelTime = eventTime
// Only add to cleanup queue if roughCount suggests folder might be empty
if state.roughCount > 0 {
glog.V(3).Infof("EmptyFolderCleaner: skipping queue for %s, roughCount=%d", directory, state.roughCount)
return
}
// Add to cleanup queue with event time (handles out-of-order events)
if efc.cleanupQueue.Add(directory, entryName, eventTime) {
glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup (triggered by %s)", directory, entryName)
}
}
// OnCreateEvent is called when a file or directory is created
// Both file and directory creations cancel pending cleanup for the parent folder
func (efc *EmptyFolderCleaner) OnCreateEvent(directory string, entryName string, isDirectory bool) {
// Skip if not under bucket path (must be at least /buckets/<bucket>/...)
if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) {
return
}
efc.mu.Lock()
defer efc.mu.Unlock()
// Check enabled inside lock to avoid race with Stop()
if !efc.enabled {
return
}
// Update cached count only if already tracked (no need to track new folders)
if state, exists := efc.folderCounts[directory]; exists {
state.roughCount++
state.lastAddTime = time.Now()
}
// Remove from cleanup queue (cancel pending cleanup)
if efc.cleanupQueue.Remove(directory) {
glog.V(3).Infof("EmptyFolderCleaner: cancelled cleanup for %s due to new entry", directory)
}
}
// cleanupProcessor runs in background and processes the cleanup queue
func (efc *EmptyFolderCleaner) cleanupProcessor() {
ticker := time.NewTicker(efc.processorSleep)
defer ticker.Stop()
for {
select {
case <-efc.stopCh:
return
case <-ticker.C:
efc.processCleanupQueue()
}
}
}
// processCleanupQueue processes items from the cleanup queue
func (efc *EmptyFolderCleaner) processCleanupQueue() {
if efc.cleanupQueue.Len() == 0 {
return
}
glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, oldest_age=%v)",
efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge())
// Only process items that have been queued longer than maxAge
for {
// Check if still enabled
if !efc.IsEnabled() {
return
}
// Only pop items old enough — newer items stay in the queue
folder, triggeredBy, ok := efc.cleanupQueue.PopOlderThan(efc.cleanupQueue.maxAge)
if !ok {
break
}
// Execute cleanup for this folder
efc.executeCleanup(folder, triggeredBy)
}
}
// executeCleanup performs the actual cleanup of an empty folder
func (efc *EmptyFolderCleaner) executeCleanup(folder string, triggeredBy string) {
efc.mu.Lock()
// Quick check: if we have cached count and it's > 0, skip
if state, exists := efc.folderCounts[folder]; exists {
if state.roughCount > 0 {
glog.V(3).Infof("EmptyFolderCleaner: skipping %s (triggered by %s), cached count=%d", folder, triggeredBy, state.roughCount)
efc.mu.Unlock()
return
}
// If there was an add after our delete, skip
if !state.lastAddTime.IsZero() && state.lastAddTime.After(state.lastDelTime) {
glog.V(3).Infof("EmptyFolderCleaner: skipping %s (triggered by %s), add happened after delete", folder, triggeredBy)
efc.mu.Unlock()
return
}
}
efc.mu.Unlock()
// Re-check ownership (topology might have changed)
if !efc.ownsFolder(folder) {
glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s (triggered by %s), skipping", folder, triggeredBy)
return
}
ctx := context.Background()
bucketPath, autoRemove, source, attrValue, err := efc.getBucketCleanupPolicy(ctx, folder)
if err != nil {
if err == filer_pb.ErrNotFound {
return
}
glog.V(2).Infof("EmptyFolderCleaner: failed to load bucket cleanup policy for folder %s (triggered by %s): %v", folder, triggeredBy, err)
return
}
if !autoRemove {
glog.V(3).Infof("EmptyFolderCleaner: skipping folder %s (triggered by %s), bucket %s auto-remove-empty-folders disabled (source=%s attr=%s)",
folder, triggeredBy, bucketPath, source, attrValue)
return
}
// Check if folder is actually empty (count up to maxCountCheck)
count, err := efc.countItems(ctx, folder)
if err != nil {
glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err)
return
}
efc.mu.Lock()
// Update cache
if _, exists := efc.folderCounts[folder]; !exists {
efc.folderCounts[folder] = &folderState{}
}
efc.folderCounts[folder].roughCount = count
efc.folderCounts[folder].lastCheck = time.Now()
efc.mu.Unlock()
if count > 0 {
glog.V(4).Infof("EmptyFolderCleaner: folder %s (triggered by %s) has %d items, not empty", folder, triggeredBy, count)
return
}
// Skip explicitly created directory markers (e.g., PUT /bucket/folder/)
// These have a MIME type set and should be preserved even when empty
if isKeyObj, err := efc.filer.IsDirectoryKeyObject(ctx, util.FullPath(folder)); err != nil {
glog.V(2).Infof("EmptyFolderCleaner: error checking directory key object %s: %v", folder, err)
return
} else if isKeyObj {
glog.V(3).Infof("EmptyFolderCleaner: skipping %s (triggered by %s), explicit directory marker", folder, triggeredBy)
return
}
// Delete the empty folder
glog.Infof("EmptyFolderCleaner: deleting empty folder %s (triggered by %s)", folder, triggeredBy)
if err := efc.deleteFolder(ctx, folder); err != nil {
glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s (triggered by %s): %v", folder, triggeredBy, err)
return
}
// Clean up cache entry
efc.mu.Lock()
delete(efc.folderCounts, folder)
efc.mu.Unlock()
// After deleting this folder, immediately try to clean the parent.
// Relying solely on cascading metadata events would re-enter the full
// delay queue for each ancestor level, causing multi-minute cascading
// waits (e.g. 3 levels × 2m = 6m+). Instead, walk up eagerly.
parentDir, _ := util.FullPath(folder).DirAndName()
if parentDir != "" && parentDir != folder &&
efc.bucketPath != "" && isUnderBucketPath(parentDir, efc.bucketPath) {
// Remove any pending queue entry for the parent so we don't
// double-process it later from a stale event.
efc.cleanupQueue.Remove(parentDir)
efc.executeCleanup(parentDir, triggeredBy)
}
}
// countItems counts items in a folder (up to maxCountCheck)
func (efc *EmptyFolderCleaner) countItems(ctx context.Context, folder string) (int, error) {
return efc.filer.CountDirectoryEntries(ctx, util.FullPath(folder), efc.maxCountCheck)
}
// deleteFolder deletes an empty folder
func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) error {
return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0)
}
func (efc *EmptyFolderCleaner) getBucketCleanupPolicy(ctx context.Context, folder string) (bucketPath string, autoRemove bool, source string, attrValue string, err error) {
bucketPath, ok := util.ExtractBucketPath(efc.bucketPath, folder, true)
if !ok {
return "", true, "default", "<not_bucket_path>", nil
}
now := time.Now()
efc.mu.RLock()
if state, found := efc.bucketCleanupPolicies[bucketPath]; found && now.Sub(state.lastCheck) <= efc.cacheExpiry {
efc.mu.RUnlock()
return bucketPath, state.autoRemove, "cache", state.attrValue, nil
}
efc.mu.RUnlock()
attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(bucketPath))
if err != nil {
return "", true, "", "", err
}
autoRemove, attrValue = autoRemoveEmptyFoldersEnabled(attrs)
efc.mu.Lock()
if efc.bucketCleanupPolicies == nil {
efc.bucketCleanupPolicies = make(map[string]*bucketCleanupPolicyState)
}
efc.bucketCleanupPolicies[bucketPath] = &bucketCleanupPolicyState{
autoRemove: autoRemove,
attrValue: attrValue,
lastCheck: now,
}
efc.mu.Unlock()
return bucketPath, autoRemove, "filer", attrValue, nil
}
func autoRemoveEmptyFoldersEnabled(attrs map[string][]byte) (bool, string) {
if attrs == nil {
return true, "<no_attrs>"
}
value, found := attrs[s3_constants.ExtAllowEmptyFolders]
if !found {
return true, "<missing>"
}
text := strings.TrimSpace(string(value))
if text == "" {
return true, "<empty>"
}
return !strings.EqualFold(text, "true"), text
}
// isUnderPath checks if child is under parent path
func isUnderPath(child, parent string) bool {
if parent == "" || parent == "/" {
return true
}
// Ensure parent ends without slash for proper prefix matching
if len(parent) > 0 && parent[len(parent)-1] == '/' {
parent = parent[:len(parent)-1]
}
// Child must start with parent and then have a / or be exactly parent
if len(child) < len(parent) {
return false
}
if child[:len(parent)] != parent {
return false
}
if len(child) == len(parent) {
return true
}
return child[len(parent)] == '/'
}
// isUnderBucketPath checks if directory is inside a bucket (under /buckets/<bucket>/...)
// This ensures we only clean up folders inside buckets, not the buckets themselves
func isUnderBucketPath(directory, bucketPath string) bool {
if bucketPath == "" {
return true
}
// Ensure bucketPath ends without slash
if len(bucketPath) > 0 && bucketPath[len(bucketPath)-1] == '/' {
bucketPath = bucketPath[:len(bucketPath)-1]
}
// Directory must be under bucketPath
if !isUnderPath(directory, bucketPath) {
return false
}
// Directory must be at least /buckets/<bucket>/<something>
// i.e., depth must be at least bucketPath depth + 2
// For /buckets (depth 1), we need at least /buckets/mybucket/folder (depth 3)
bucketPathDepth := strings.Count(bucketPath, "/")
directoryDepth := strings.Count(directory, "/")
return directoryDepth >= bucketPathDepth+2
}
// cacheEvictionLoop periodically removes stale entries from folderCounts
func (efc *EmptyFolderCleaner) cacheEvictionLoop() {
ticker := time.NewTicker(efc.cacheExpiry)
defer ticker.Stop()
for {
select {
case <-efc.stopCh:
return
case <-ticker.C:
efc.evictStaleCacheEntries()
}
}
}
// evictStaleCacheEntries removes cache entries that haven't been accessed recently
func (efc *EmptyFolderCleaner) evictStaleCacheEntries() {
efc.mu.Lock()
defer efc.mu.Unlock()
now := time.Now()
expiredCount := 0
for folder, state := range efc.folderCounts {
// Skip if folder is in cleanup queue
if efc.cleanupQueue.Contains(folder) {
continue
}
// Find the most recent activity time for this folder
lastActivity := state.lastCheck
if state.lastAddTime.After(lastActivity) {
lastActivity = state.lastAddTime
}
if state.lastDelTime.After(lastActivity) {
lastActivity = state.lastDelTime
}
// Evict if no activity within cache expiry period
if now.Sub(lastActivity) > efc.cacheExpiry {
delete(efc.folderCounts, folder)
expiredCount++
}
}
for bucketPath, state := range efc.bucketCleanupPolicies {
if now.Sub(state.lastCheck) > efc.cacheExpiry {
delete(efc.bucketCleanupPolicies, bucketPath)
}
}
if expiredCount > 0 {
glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount)
}
}
// Stop stops the cleaner and cancels all pending tasks
func (efc *EmptyFolderCleaner) Stop() {
close(efc.stopCh)
efc.mu.Lock()
defer efc.mu.Unlock()
efc.enabled = false
efc.cleanupQueue.Clear()
efc.folderCounts = make(map[string]*folderState) // Clear cache on stop
efc.bucketCleanupPolicies = make(map[string]*bucketCleanupPolicyState)
}
// GetPendingCleanupCount returns the number of pending cleanup tasks (for testing)
func (efc *EmptyFolderCleaner) GetPendingCleanupCount() int {
return efc.cleanupQueue.Len()
}
// GetCachedFolderCount returns the cached count for a folder (for testing)
func (efc *EmptyFolderCleaner) GetCachedFolderCount(folder string) (int, bool) {
efc.mu.RLock()
defer efc.mu.RUnlock()
if state, exists := efc.folderCounts[folder]; exists {
return state.roughCount, true
}
return 0, false
}