From 7919cc7ca0eda1f7233c520dfc6ef47813b6dd9f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 26 May 2026 17:49:18 -0700 Subject: [PATCH] wdclient: prune filers dropped from master discovery (#9699) * wdclient: prune filers dropped from master discovery Filer discovery only appended new addresses; it never removed ones that disappeared from the master snapshot. After a K8s filer pod rolled to a new IP the old address lingered in filerAddresses and got retried again every resetTimeout window, stalling S3 uploads on i/o timeouts. Treat the master snapshot as authoritative: keep survivors (preserving their health counters and the active round-robin index), append newcomers with fresh health, drop the rest. Empty snapshots are still ignored so a transient master outage can't wipe the list. * wdclient: skip discovery snapshots with no usable addresses Guard against the defensive case where master returns updates whose addresses are all empty; reconciling against an empty discovered set would prune every filer. --- weed/wdclient/filer_client.go | 83 ++++++++++++++++++----- weed/wdclient/filer_client_test.go | 102 +++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 17 deletions(-) create mode 100644 weed/wdclient/filer_client_test.go diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index ace4a6a7e..bff17ed0b 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -342,25 +342,37 @@ func (fc *FilerClient) refreshFilerList() { return } - // Build new filer address list - discoveredFilers := make(map[pb.ServerAddress]bool) + // Build new filer address set + discoveredFilers := make(map[pb.ServerAddress]struct{}, len(updates)) for _, update := range updates { if update.Address != "" { - discoveredFilers[pb.ServerAddress(update.Address)] = true + discoveredFilers[pb.ServerAddress(update.Address)] = struct{}{} } } - // Thread-safe update of filer list + // Ignore snapshots whose addresses are all empty; reconciling against an + // empty set would wipe the in-memory list. + if len(discoveredFilers) == 0 { + glog.V(1).Infof("FilerClient: discovery snapshot for group '%s' had no usable addresses, keeping existing list", fc.filerGroup) + return + } + + fc.applyDiscoveredFilers(discoveredFilers) +} + +// applyDiscoveredFilers treats the master snapshot as authoritative: survivors +// keep their health counters, new addresses get fresh health, addresses missing +// from the snapshot are pruned so replaced pods (e.g. rolled K8s filer pods +// with new IPs) don't linger and get retried after the circuit-breaker reset. +func (fc *FilerClient) applyDiscoveredFilers(discoveredFilers map[pb.ServerAddress]struct{}) { fc.filerAddressesMu.Lock() defer fc.filerAddressesMu.Unlock() - // Build a map of existing filers for efficient O(1) lookup existingFilers := make(map[pb.ServerAddress]struct{}, len(fc.filerAddresses)) for _, f := range fc.filerAddresses { existingFilers[f] = struct{}{} } - // Find new filers - O(N+M) instead of O(N*M) var newFilers []pb.ServerAddress for addr := range discoveredFilers { if _, found := existingFilers[addr]; !found { @@ -368,20 +380,57 @@ func (fc *FilerClient) refreshFilerList() { } } - // Add new filers - if len(newFilers) > 0 { - glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers) - fc.filerAddresses = append(fc.filerAddresses, newFilers...) - - // Initialize health tracking for new filers - for range newFilers { - fc.filerHealth = append(fc.filerHealth, &filerHealth{}) + var removedFilers []pb.ServerAddress + for _, f := range fc.filerAddresses { + if _, found := discoveredFilers[f]; !found { + removedFilers = append(removedFilers, f) } } - // Optionally, remove filers that are no longer in the cluster - // For now, we keep all filers and rely on health checks to avoid dead ones - // This prevents removing filers that might be temporarily unavailable + if len(newFilers) == 0 && len(removedFilers) == 0 { + return + } + + // Remember the active filer so the round-robin pointer can follow it across the rebuild. + currentIndex := atomic.LoadInt32(&fc.filerIndex) + var currentFiler pb.ServerAddress + if currentIndex >= 0 && currentIndex < int32(len(fc.filerAddresses)) { + currentFiler = fc.filerAddresses[currentIndex] + } + + newAddresses := make([]pb.ServerAddress, 0, len(fc.filerAddresses)-len(removedFilers)+len(newFilers)) + newHealth := make([]*filerHealth, 0, cap(newAddresses)) + for i, f := range fc.filerAddresses { + if _, found := discoveredFilers[f]; found { + newAddresses = append(newAddresses, f) + newHealth = append(newHealth, fc.filerHealth[i]) + } + } + for _, f := range newFilers { + newAddresses = append(newAddresses, f) + newHealth = append(newHealth, &filerHealth{}) + } + + fc.filerAddresses = newAddresses + fc.filerHealth = newHealth + + var newIndex int32 + if currentFiler != "" { + for i, f := range newAddresses { + if f == currentFiler { + newIndex = int32(i) + break + } + } + } + atomic.StoreInt32(&fc.filerIndex, newIndex) + + if len(removedFilers) > 0 { + glog.V(0).Infof("FilerClient: removed %d filer(s) no longer in group '%s': %v", len(removedFilers), fc.filerGroup, removedFilers) + } + if len(newFilers) > 0 { + glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers) + } } // GetLookupFileIdFunction returns a lookup function with URL preference handling diff --git a/weed/wdclient/filer_client_test.go b/weed/wdclient/filer_client_test.go new file mode 100644 index 000000000..1ab8b9a87 --- /dev/null +++ b/weed/wdclient/filer_client_test.go @@ -0,0 +1,102 @@ +package wdclient + +import ( + "sync/atomic" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb" +) + +func newTestFilerClient(addrs ...pb.ServerAddress) *FilerClient { + health := make([]*filerHealth, len(addrs)) + for i := range health { + health[i] = &filerHealth{} + } + return &FilerClient{ + filerAddresses: addrs, + filerHealth: health, + } +} + +func filerAddressList(fc *FilerClient) []pb.ServerAddress { + out := make([]pb.ServerAddress, len(fc.filerAddresses)) + copy(out, fc.filerAddresses) + return out +} + +func TestApplyDiscoveredFilersPrunesStaleAddress(t *testing.T) { + a := pb.ServerAddress("10.0.0.1:18888") + b := pb.ServerAddress("10.0.0.2:18888") // gets replaced by c + c := pb.ServerAddress("10.0.0.3:18888") + + fc := newTestFilerClient(a, b) + // Give b a non-trivial failure count so we can confirm it leaves with its health. + atomic.StoreInt32(&fc.filerHealth[1].failureCount, 7) + // Give a a known failure count so we can confirm survivor health is preserved. + atomic.StoreInt32(&fc.filerHealth[0].failureCount, 2) + atomic.StoreInt32(&fc.filerIndex, 1) // active filer is b + + fc.applyDiscoveredFilers(map[pb.ServerAddress]struct{}{ + a: {}, + c: {}, + }) + + got := filerAddressList(fc) + if len(got) != 2 || got[0] != a || got[1] != c { + t.Fatalf("expected [%s %s], got %v", a, c, got) + } + if len(fc.filerHealth) != 2 { + t.Fatalf("expected 2 health entries, got %d", len(fc.filerHealth)) + } + if got := atomic.LoadInt32(&fc.filerHealth[0].failureCount); got != 2 { + t.Errorf("survivor health was reset: want 2, got %d", got) + } + if got := atomic.LoadInt32(&fc.filerHealth[1].failureCount); got != 0 { + t.Errorf("newly added filer should start with fresh health, got failureCount=%d", got) + } + // Active filer (b) disappeared; index must reset rather than dangle. + if idx := atomic.LoadInt32(&fc.filerIndex); idx != 0 { + t.Errorf("expected filerIndex reset to 0 after active filer removed, got %d", idx) + } +} + +func TestApplyDiscoveredFilersKeepsIndexOnSurvivor(t *testing.T) { + a := pb.ServerAddress("10.0.0.1:18888") // gets replaced + b := pb.ServerAddress("10.0.0.2:18888") // active, survives + c := pb.ServerAddress("10.0.0.3:18888") // new + + fc := newTestFilerClient(a, b) + atomic.StoreInt32(&fc.filerIndex, 1) // active filer is b + + fc.applyDiscoveredFilers(map[pb.ServerAddress]struct{}{ + b: {}, + c: {}, + }) + + got := filerAddressList(fc) + if len(got) != 2 || got[0] != b || got[1] != c { + t.Fatalf("expected [%s %s], got %v", b, c, got) + } + // b moved to index 0 after a was pruned; index should follow. + if idx := atomic.LoadInt32(&fc.filerIndex); idx != 0 { + t.Errorf("expected filerIndex to follow surviving active filer to position 0, got %d", idx) + } +} + +func TestApplyDiscoveredFilersNoChangeIsNoop(t *testing.T) { + a := pb.ServerAddress("10.0.0.1:18888") + b := pb.ServerAddress("10.0.0.2:18888") + + fc := newTestFilerClient(a, b) + originalHealthA := fc.filerHealth[0] + originalHealthB := fc.filerHealth[1] + + fc.applyDiscoveredFilers(map[pb.ServerAddress]struct{}{ + a: {}, + b: {}, + }) + + if fc.filerHealth[0] != originalHealthA || fc.filerHealth[1] != originalHealthB { + t.Errorf("no-op refresh should not reallocate health entries") + } +}