mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-15 22:31:28 +00:00
Fix race conditions and make Close() idempotent
Address CodeRabbit review #3512078995: 1. **Critical: Fix unsynchronized read in error message** - Line 584 read len(fc.filerAddresses) without lock - Race with refreshFilerList appending to slice - Fixed: Take RLock to read length safely - Prevents race detector warnings 2. **Important: Make Close() idempotent** - Closing already-closed channel panics - Can happen with layered cleanup in shutdown paths - Fixed: Use sync.Once to ensure single close - Safe to call Close() multiple times now 3. **Nitpick: Add warning for empty filer address** - getFilerAddress() can return empty string - Helps diagnose unexpected state - Added: Warning log when no filers available 4. **Nitpick: Guard deprecated index-based helpers** - shouldSkipUnhealthyFiler, recordFilerSuccess/Failure - Accessed filerHealth without lock (races with discovery) - Fixed: Take RLock and check bounds before array access - Prevents index out of bounds and races Changes: - filer_client.go: - Add closeDiscoveryOnce sync.Once field - Use Do() in Close() for idempotent channel close - Add RLock guards to deprecated index-based helpers - Add bounds checking to prevent panics - Synchronized read of filerAddresses length in error - s3api_server.go: - Add warning log when getFilerAddress returns empty Benefits: - No race conditions (passes race detector) - No panic on double-close - Better error diagnostics - Safe with discovery enabled - Production-hardened shutdown logic
This commit is contained in:
@@ -221,6 +221,7 @@ func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress {
|
||||
if len(s3a.option.Filers) > 0 {
|
||||
return s3a.option.Filers[0]
|
||||
}
|
||||
glog.Warningf("getFilerAddress: no filer addresses available")
|
||||
return ""
|
||||
}
|
||||
|
||||
|
||||
@@ -56,10 +56,11 @@ type FilerClient struct {
|
||||
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
|
||||
|
||||
// Filer discovery fields
|
||||
masterClient *MasterClient // Optional: for discovering filers in the same group
|
||||
filerGroup string // Optional: filer group for discovery
|
||||
discoveryInterval time.Duration // How often to refresh filer list from master
|
||||
stopDiscovery chan struct{} // Signal to stop discovery goroutine
|
||||
masterClient *MasterClient // Optional: for discovering filers in the same group
|
||||
filerGroup string // Optional: filer group for discovery
|
||||
discoveryInterval time.Duration // How often to refresh filer list from master
|
||||
stopDiscovery chan struct{} // Signal to stop discovery goroutine
|
||||
closeDiscoveryOnce sync.Once // Ensures discovery channel is closed at most once
|
||||
}
|
||||
|
||||
// filerVolumeProvider implements VolumeLocationProvider by querying filer
|
||||
@@ -205,9 +206,12 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress {
|
||||
}
|
||||
|
||||
// Close stops the filer discovery goroutine if running
|
||||
// Safe to call multiple times (idempotent)
|
||||
func (fc *FilerClient) Close() {
|
||||
if fc.stopDiscovery != nil {
|
||||
close(fc.stopDiscovery)
|
||||
fc.closeDiscoveryOnce.Do(func() {
|
||||
close(fc.stopDiscovery)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -415,8 +419,15 @@ func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) b
|
||||
|
||||
// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead
|
||||
// This function is kept for backward compatibility but requires array access
|
||||
// Note: Accesses filerHealth without lock; safe only when discovery is disabled
|
||||
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
|
||||
fc.filerAddressesMu.RLock()
|
||||
if index >= int32(len(fc.filerHealth)) {
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
return true // Invalid index - skip
|
||||
}
|
||||
health := fc.filerHealth[index]
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
return fc.shouldSkipUnhealthyFilerWithHealth(health)
|
||||
}
|
||||
|
||||
@@ -427,7 +438,14 @@ func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) {
|
||||
|
||||
// recordFilerSuccess resets failure tracking for a successful filer
|
||||
func (fc *FilerClient) recordFilerSuccess(index int32) {
|
||||
fc.recordFilerSuccessWithHealth(fc.filerHealth[index])
|
||||
fc.filerAddressesMu.RLock()
|
||||
if index >= int32(len(fc.filerHealth)) {
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
return // Invalid index
|
||||
}
|
||||
health := fc.filerHealth[index]
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
fc.recordFilerSuccessWithHealth(health)
|
||||
}
|
||||
|
||||
// recordFilerFailureWithHealth increments failure count for an unhealthy filer
|
||||
@@ -438,7 +456,14 @@ func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) {
|
||||
|
||||
// recordFilerFailure increments failure count for an unhealthy filer
|
||||
func (fc *FilerClient) recordFilerFailure(index int32) {
|
||||
fc.recordFilerFailureWithHealth(fc.filerHealth[index])
|
||||
fc.filerAddressesMu.RLock()
|
||||
if index >= int32(len(fc.filerHealth)) {
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
return // Invalid index
|
||||
}
|
||||
health := fc.filerHealth[index]
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
fc.recordFilerFailureWithHealth(health)
|
||||
}
|
||||
|
||||
// LookupVolumeIds queries the filer for volume locations with automatic failover
|
||||
@@ -580,5 +605,8 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
|
||||
}
|
||||
|
||||
// All retries exhausted
|
||||
return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
|
||||
fc.filerAddressesMu.RLock()
|
||||
totalFilers := len(fc.filerAddresses)
|
||||
fc.filerAddressesMu.RUnlock()
|
||||
return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", totalFilers, maxRetries, lastErr)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user