mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-29 21:20:21 +00:00
wdclient: prune filers dropped from master discovery (#9699)
* wdclient: prune filers dropped from master discovery Filer discovery only appended new addresses; it never removed ones that disappeared from the master snapshot. After a K8s filer pod rolled to a new IP the old address lingered in filerAddresses and got retried again every resetTimeout window, stalling S3 uploads on i/o timeouts. Treat the master snapshot as authoritative: keep survivors (preserving their health counters and the active round-robin index), append newcomers with fresh health, drop the rest. Empty snapshots are still ignored so a transient master outage can't wipe the list. * wdclient: skip discovery snapshots with no usable addresses Guard against the defensive case where master returns updates whose addresses are all empty; reconciling against an empty discovered set would prune every filer.
This commit is contained in:
@@ -342,25 +342,37 @@ func (fc *FilerClient) refreshFilerList() {
|
||||
return
|
||||
}
|
||||
|
||||
// Build new filer address list
|
||||
discoveredFilers := make(map[pb.ServerAddress]bool)
|
||||
// Build new filer address set
|
||||
discoveredFilers := make(map[pb.ServerAddress]struct{}, len(updates))
|
||||
for _, update := range updates {
|
||||
if update.Address != "" {
|
||||
discoveredFilers[pb.ServerAddress(update.Address)] = true
|
||||
discoveredFilers[pb.ServerAddress(update.Address)] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Thread-safe update of filer list
|
||||
// Ignore snapshots whose addresses are all empty; reconciling against an
|
||||
// empty set would wipe the in-memory list.
|
||||
if len(discoveredFilers) == 0 {
|
||||
glog.V(1).Infof("FilerClient: discovery snapshot for group '%s' had no usable addresses, keeping existing list", fc.filerGroup)
|
||||
return
|
||||
}
|
||||
|
||||
fc.applyDiscoveredFilers(discoveredFilers)
|
||||
}
|
||||
|
||||
// applyDiscoveredFilers treats the master snapshot as authoritative: survivors
|
||||
// keep their health counters, new addresses get fresh health, addresses missing
|
||||
// from the snapshot are pruned so replaced pods (e.g. rolled K8s filer pods
|
||||
// with new IPs) don't linger and get retried after the circuit-breaker reset.
|
||||
func (fc *FilerClient) applyDiscoveredFilers(discoveredFilers map[pb.ServerAddress]struct{}) {
|
||||
fc.filerAddressesMu.Lock()
|
||||
defer fc.filerAddressesMu.Unlock()
|
||||
|
||||
// Build a map of existing filers for efficient O(1) lookup
|
||||
existingFilers := make(map[pb.ServerAddress]struct{}, len(fc.filerAddresses))
|
||||
for _, f := range fc.filerAddresses {
|
||||
existingFilers[f] = struct{}{}
|
||||
}
|
||||
|
||||
// Find new filers - O(N+M) instead of O(N*M)
|
||||
var newFilers []pb.ServerAddress
|
||||
for addr := range discoveredFilers {
|
||||
if _, found := existingFilers[addr]; !found {
|
||||
@@ -368,20 +380,57 @@ func (fc *FilerClient) refreshFilerList() {
|
||||
}
|
||||
}
|
||||
|
||||
// Add new filers
|
||||
if len(newFilers) > 0 {
|
||||
glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers)
|
||||
fc.filerAddresses = append(fc.filerAddresses, newFilers...)
|
||||
|
||||
// Initialize health tracking for new filers
|
||||
for range newFilers {
|
||||
fc.filerHealth = append(fc.filerHealth, &filerHealth{})
|
||||
var removedFilers []pb.ServerAddress
|
||||
for _, f := range fc.filerAddresses {
|
||||
if _, found := discoveredFilers[f]; !found {
|
||||
removedFilers = append(removedFilers, f)
|
||||
}
|
||||
}
|
||||
|
||||
// Optionally, remove filers that are no longer in the cluster
|
||||
// For now, we keep all filers and rely on health checks to avoid dead ones
|
||||
// This prevents removing filers that might be temporarily unavailable
|
||||
if len(newFilers) == 0 && len(removedFilers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Remember the active filer so the round-robin pointer can follow it across the rebuild.
|
||||
currentIndex := atomic.LoadInt32(&fc.filerIndex)
|
||||
var currentFiler pb.ServerAddress
|
||||
if currentIndex >= 0 && currentIndex < int32(len(fc.filerAddresses)) {
|
||||
currentFiler = fc.filerAddresses[currentIndex]
|
||||
}
|
||||
|
||||
newAddresses := make([]pb.ServerAddress, 0, len(fc.filerAddresses)-len(removedFilers)+len(newFilers))
|
||||
newHealth := make([]*filerHealth, 0, cap(newAddresses))
|
||||
for i, f := range fc.filerAddresses {
|
||||
if _, found := discoveredFilers[f]; found {
|
||||
newAddresses = append(newAddresses, f)
|
||||
newHealth = append(newHealth, fc.filerHealth[i])
|
||||
}
|
||||
}
|
||||
for _, f := range newFilers {
|
||||
newAddresses = append(newAddresses, f)
|
||||
newHealth = append(newHealth, &filerHealth{})
|
||||
}
|
||||
|
||||
fc.filerAddresses = newAddresses
|
||||
fc.filerHealth = newHealth
|
||||
|
||||
var newIndex int32
|
||||
if currentFiler != "" {
|
||||
for i, f := range newAddresses {
|
||||
if f == currentFiler {
|
||||
newIndex = int32(i)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
atomic.StoreInt32(&fc.filerIndex, newIndex)
|
||||
|
||||
if len(removedFilers) > 0 {
|
||||
glog.V(0).Infof("FilerClient: removed %d filer(s) no longer in group '%s': %v", len(removedFilers), fc.filerGroup, removedFilers)
|
||||
}
|
||||
if len(newFilers) > 0 {
|
||||
glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers)
|
||||
}
|
||||
}
|
||||
|
||||
// GetLookupFileIdFunction returns a lookup function with URL preference handling
|
||||
|
||||
102
weed/wdclient/filer_client_test.go
Normal file
102
weed/wdclient/filer_client_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package wdclient
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
)
|
||||
|
||||
func newTestFilerClient(addrs ...pb.ServerAddress) *FilerClient {
|
||||
health := make([]*filerHealth, len(addrs))
|
||||
for i := range health {
|
||||
health[i] = &filerHealth{}
|
||||
}
|
||||
return &FilerClient{
|
||||
filerAddresses: addrs,
|
||||
filerHealth: health,
|
||||
}
|
||||
}
|
||||
|
||||
func filerAddressList(fc *FilerClient) []pb.ServerAddress {
|
||||
out := make([]pb.ServerAddress, len(fc.filerAddresses))
|
||||
copy(out, fc.filerAddresses)
|
||||
return out
|
||||
}
|
||||
|
||||
func TestApplyDiscoveredFilersPrunesStaleAddress(t *testing.T) {
|
||||
a := pb.ServerAddress("10.0.0.1:18888")
|
||||
b := pb.ServerAddress("10.0.0.2:18888") // gets replaced by c
|
||||
c := pb.ServerAddress("10.0.0.3:18888")
|
||||
|
||||
fc := newTestFilerClient(a, b)
|
||||
// Give b a non-trivial failure count so we can confirm it leaves with its health.
|
||||
atomic.StoreInt32(&fc.filerHealth[1].failureCount, 7)
|
||||
// Give a a known failure count so we can confirm survivor health is preserved.
|
||||
atomic.StoreInt32(&fc.filerHealth[0].failureCount, 2)
|
||||
atomic.StoreInt32(&fc.filerIndex, 1) // active filer is b
|
||||
|
||||
fc.applyDiscoveredFilers(map[pb.ServerAddress]struct{}{
|
||||
a: {},
|
||||
c: {},
|
||||
})
|
||||
|
||||
got := filerAddressList(fc)
|
||||
if len(got) != 2 || got[0] != a || got[1] != c {
|
||||
t.Fatalf("expected [%s %s], got %v", a, c, got)
|
||||
}
|
||||
if len(fc.filerHealth) != 2 {
|
||||
t.Fatalf("expected 2 health entries, got %d", len(fc.filerHealth))
|
||||
}
|
||||
if got := atomic.LoadInt32(&fc.filerHealth[0].failureCount); got != 2 {
|
||||
t.Errorf("survivor health was reset: want 2, got %d", got)
|
||||
}
|
||||
if got := atomic.LoadInt32(&fc.filerHealth[1].failureCount); got != 0 {
|
||||
t.Errorf("newly added filer should start with fresh health, got failureCount=%d", got)
|
||||
}
|
||||
// Active filer (b) disappeared; index must reset rather than dangle.
|
||||
if idx := atomic.LoadInt32(&fc.filerIndex); idx != 0 {
|
||||
t.Errorf("expected filerIndex reset to 0 after active filer removed, got %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyDiscoveredFilersKeepsIndexOnSurvivor(t *testing.T) {
|
||||
a := pb.ServerAddress("10.0.0.1:18888") // gets replaced
|
||||
b := pb.ServerAddress("10.0.0.2:18888") // active, survives
|
||||
c := pb.ServerAddress("10.0.0.3:18888") // new
|
||||
|
||||
fc := newTestFilerClient(a, b)
|
||||
atomic.StoreInt32(&fc.filerIndex, 1) // active filer is b
|
||||
|
||||
fc.applyDiscoveredFilers(map[pb.ServerAddress]struct{}{
|
||||
b: {},
|
||||
c: {},
|
||||
})
|
||||
|
||||
got := filerAddressList(fc)
|
||||
if len(got) != 2 || got[0] != b || got[1] != c {
|
||||
t.Fatalf("expected [%s %s], got %v", b, c, got)
|
||||
}
|
||||
// b moved to index 0 after a was pruned; index should follow.
|
||||
if idx := atomic.LoadInt32(&fc.filerIndex); idx != 0 {
|
||||
t.Errorf("expected filerIndex to follow surviving active filer to position 0, got %d", idx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyDiscoveredFilersNoChangeIsNoop(t *testing.T) {
|
||||
a := pb.ServerAddress("10.0.0.1:18888")
|
||||
b := pb.ServerAddress("10.0.0.2:18888")
|
||||
|
||||
fc := newTestFilerClient(a, b)
|
||||
originalHealthA := fc.filerHealth[0]
|
||||
originalHealthB := fc.filerHealth[1]
|
||||
|
||||
fc.applyDiscoveredFilers(map[pb.ServerAddress]struct{}{
|
||||
a: {},
|
||||
b: {},
|
||||
})
|
||||
|
||||
if fc.filerHealth[0] != originalHealthA || fc.filerHealth[1] != originalHealthB {
|
||||
t.Errorf("no-op refresh should not reallocate health entries")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user