From 73f10fa52804fc22be1b1e34c98577b74bbaec6d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 18 Apr 2026 21:42:36 -0700 Subject: [PATCH] peer chunk sharing 6/8: announce queue + batched flush (#9135) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mount: batched announcer + pooled peer conns for mount-to-mount RPCs * peer_announcer.go: non-blocking EnqueueAnnounce + ticker flush that groups fids by HRW owner, fans out one ChunkAnnounce per owner in parallel. announcedAt is pruned at 2× TTL so it stays bounded. * peer_dialer.go: PeerConnPool caches one grpc.ClientConn per peer address; the announcer and (next PR) the fetcher share it so steady-state owner RPCs skip the handshake cost entirely. Bounded at 4096 cached entries; shutdown conns are transparently replaced. * WFS starts both alongside the gRPC server; stops them on unmount. --- weed/mount/peer_announcer.go | 374 ++++++++++++++++++++++++++++++ weed/mount/peer_announcer_test.go | 374 ++++++++++++++++++++++++++++++ weed/mount/peer_dialer.go | 137 +++++++++++ weed/mount/weedfs.go | 34 +++ weed/mount/weedfs_write.go | 22 +- 5 files changed, 940 insertions(+), 1 deletion(-) create mode 100644 weed/mount/peer_announcer.go create mode 100644 weed/mount/peer_announcer_test.go create mode 100644 weed/mount/peer_dialer.go diff --git a/weed/mount/peer_announcer.go b/weed/mount/peer_announcer.go new file mode 100644 index 000000000..e7469e592 --- /dev/null +++ b/weed/mount/peer_announcer.go @@ -0,0 +1,374 @@ +package mount + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb" +) + +// PeerAnnouncer batches and flushes ChunkAnnounce RPCs to the +// HRW-assigned owner mounts for each cached fid this mount holds. +// +// Shape: +// * EnqueueAnnounce(fid) is a non-blocking push into an in-memory set. +// * A background ticker flushes every announceInterval. Each flush +// drains the set, adds fids due for TTL renewal, groups by owner +// mount via HRW, and sends one ChunkAnnounce RPC per distinct owner. +// * A successful send records announced_at[fid]. A rejected (not-owner) +// fid is requeued so it will be retried after the seed view refreshes. +// +// See design-weed-mount-peer-chunk-sharing.md §4.4. +type PeerAnnouncer struct { + selfAddr string + selfDataCenter string + selfRack string + ownerFor func(fid string) string + dialPeer MountPeerDialer + localDir *PeerDirectory // populated directly for fids whose HRW owner == self + isCached func(fid string) bool // residency check; set nil to disable + announceInterval time.Duration + announceTTL time.Duration + + mu sync.Mutex + pending map[string]struct{} + announcedAt map[string]announceRecord + + stopCh chan struct{} + stopped atomic.Bool + runWg sync.WaitGroup // tracks the run goroutine so Stop can wait + + // flushCancelMu guards flushCancel, which is the cancel fn for the + // currently in-flight flush (nil when idle). Stop takes the lock, + // cancels the flush to unblock its RPCs fast, then waits on runWg. + flushCancelMu sync.Mutex + flushCancel context.CancelFunc + + flushes atomic.Int64 + sentFids atomic.Int64 + rejectedFids atomic.Int64 + flushErrs atomic.Int64 + + clock func() time.Time +} + +// MountPeerDialer opens a MountPeer gRPC client to a given peer. Tests +// inject a fake; production uses a real gRPC dial backed by a short +// connection cache. +type MountPeerDialer func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) + +// announceRecord is what we remember about a successful announce: who +// we told (the HRW owner at the time) and when. On each flush we +// recompute the HRW owner and, if it moved — typically because a new +// mount joined the seed list and the ring rebalanced — re-announce +// against the new owner. Without this, seed-view divergence during +// startup silently ages out fids on an owner that the reader never +// looks up. +type announceRecord struct { + owner string + at time.Time +} + +// NewPeerAnnouncer constructs an announcer. Caller must call Start. +// selfDataCenter and selfRack are the locality labels attached to every +// ChunkAnnounce so the receiving directory records holders with DC+rack +// and the fetcher can later re-rank by locality. +// +// localDir is optional: when non-nil, any fid whose HRW owner resolves +// to self is written directly into localDir instead of sending a +// self→self RPC (which the receiver would reject anyway). This is the +// only way a fid the writer itself owns becomes visible to peers' +// ChunkLookup calls. +func NewPeerAnnouncer(selfAddr, selfDataCenter, selfRack string, ownerFor func(fid string) string, dial MountPeerDialer, localDir *PeerDirectory) *PeerAnnouncer { + return &PeerAnnouncer{ + selfAddr: selfAddr, + selfDataCenter: selfDataCenter, + selfRack: selfRack, + ownerFor: ownerFor, + dialPeer: dial, + localDir: localDir, + announceInterval: 15 * time.Second, + announceTTL: 300 * time.Second, + pending: map[string]struct{}{}, + announcedAt: map[string]announceRecord{}, + stopCh: make(chan struct{}), + clock: time.Now, + } +} + +// EnqueueAnnounce marks a fid as needing announcement. Non-blocking; +// dedupes within a flush window. +func (a *PeerAnnouncer) EnqueueAnnounce(fid string) { + a.mu.Lock() + a.pending[fid] = struct{}{} + a.mu.Unlock() +} + +// SetCachePresence wires an optional residency check. When set, the +// announcer consults it right before dispatching a ChunkAnnounce RPC or +// writing a self-owned entry: if the cache no longer has the chunk +// (e.g. LRU-evicted under memory pressure between SetChunk and the +// next flush tick) we drop the announce rather than advertise bytes we +// can't serve back. nil disables the check. +func (a *PeerAnnouncer) SetCachePresence(isCached func(fid string) bool) { + a.mu.Lock() + defer a.mu.Unlock() + a.isCached = isCached +} + +// Start launches the background flush loop. +func (a *PeerAnnouncer) Start() { + a.runWg.Add(1) + go func() { + defer a.runWg.Done() + a.run() + }() +} + +// Stop halts the flush loop and waits for the current flush (if any) to +// finish. Safe to call multiple times. Callers that tear down shared +// dependencies (conn pool, local directory) can rely on no more +// sendTo/localDir.Announce goroutines being live after Stop returns. +func (a *PeerAnnouncer) Stop() { + if a.stopped.Swap(true) { + return + } + close(a.stopCh) + // Cancel the in-flight flush's context so its RPCs return quickly + // instead of running out the 10 s timeout. + a.flushCancelMu.Lock() + if a.flushCancel != nil { + a.flushCancel() + } + a.flushCancelMu.Unlock() + a.runWg.Wait() +} + +// Stats exposes counters for observability. +func (a *PeerAnnouncer) Stats() (flushes, sentFids, rejectedFids, flushErrs int64) { + return a.flushes.Load(), a.sentFids.Load(), a.rejectedFids.Load(), a.flushErrs.Load() +} + +// announceFlushTimeout caps how long a single flush cycle can run. A slow +// or unresponsive owner peer would otherwise block a flush goroutine +// indefinitely, and since flushes fan out per owner in parallel a +// pileup would bleed into subsequent ticks and grow unbounded. The +// bound is set a bit under one flush interval so the next tick's flush +// doesn't chase the previous one's tail. +const announceFlushTimeout = 10 * time.Second + +func (a *PeerAnnouncer) run() { + t := time.NewTicker(a.announceInterval) + defer t.Stop() + for { + select { + case <-a.stopCh: + return + case <-t.C: + ctx, cancel := context.WithTimeout(context.Background(), announceFlushTimeout) + a.flushCancelMu.Lock() + a.flushCancel = cancel + a.flushCancelMu.Unlock() + + a.flushOnce(ctx) + + a.flushCancelMu.Lock() + a.flushCancel = nil + a.flushCancelMu.Unlock() + cancel() + } + } +} + +// flushOnce is exported via a wrapper for tests. +func (a *PeerAnnouncer) FlushForTest(ctx context.Context) { + a.flushOnce(ctx) +} + +func (a *PeerAnnouncer) flushOnce(ctx context.Context) { + a.flushes.Add(1) + now := a.clock() + + a.mu.Lock() + // Swap pending in O(1) instead of copying. + toAnnounce := a.pending + a.pending = map[string]struct{}{} + + // Add renewals: (1) fids we announced long enough ago that they'll + // expire on the owner side before our next flush tick; (2) fids + // whose HRW owner has changed since we last announced them — + // usually because a new mount joined the seed list and the ring + // rebalanced. Also prune announcedAt entries that are so old + // (> 2× TTL) that re-announcing them would be pointless. + renewThreshold := now.Add(-a.announceTTL + 2*a.announceInterval) + staleCutoff := now.Add(-2 * a.announceTTL) + for fid, rec := range a.announcedAt { + switch { + case rec.at.Before(staleCutoff): + delete(a.announcedAt, fid) + case rec.at.Before(renewThreshold): + toAnnounce[fid] = struct{}{} + default: + // Same fid, fresh timestamp — but owner may have moved. + // Force a re-announce if HRW says a different mount is + // the owner now. + if a.ownerFor != nil { + if cur := a.ownerFor(fid); cur != "" && cur != rec.owner { + toAnnounce[fid] = struct{}{} + } + } + } + } + a.mu.Unlock() + + if len(toAnnounce) == 0 { + return + } + + // Drop fids the cache no longer has. The write path hooks into + // SetChunk + EnqueueAnnounce, but the chunk can be LRU-evicted in + // the window between that and the next 15 s flush. Advertising a + // chunk we can't actually serve just sends remote fetchers to our + // FetchChunk, which then returns NOT_FOUND — a wasted round trip. + // Also drop the announcedAt record so we don't keep renewing + // evicted fids forever. + a.mu.Lock() + present := a.isCached + a.mu.Unlock() + if present != nil { + for fid := range toAnnounce { + if !present(fid) { + delete(toAnnounce, fid) + a.mu.Lock() + delete(a.announcedAt, fid) + a.mu.Unlock() + } + } + if len(toAnnounce) == 0 { + return + } + } + + // Classify each fid. Three buckets: + // byOwner — HRW owner is a remote mount; send a ChunkAnnounce RPC. + // selfOwned — HRW owner is us; write directly into our local + // directory (no RPC — we ARE the directory for this + // fid). Without this, fids we wrote AND HRW-own stay + // invisible to peer ChunkLookup forever. + // deferred — HRW owner unknown (empty seed view). Re-queue for + // the next flush once listOnce has populated seeds. + byOwner := map[string][]string{} + var selfOwned []string + var deferred []string + for fid := range toAnnounce { + owner := "" + if a.ownerFor != nil { + owner = a.ownerFor(fid) + } + switch owner { + case "": + deferred = append(deferred, fid) + case a.selfAddr: + selfOwned = append(selfOwned, fid) + default: + byOwner[owner] = append(byOwner[owner], fid) + } + } + if len(selfOwned) > 0 && a.localDir != nil { + a.localDir.Announce(a.selfAddr, a.selfDataCenter, a.selfRack, selfOwned, a.announceTTL, nil) + a.mu.Lock() + for _, fid := range selfOwned { + a.announcedAt[fid] = announceRecord{owner: a.selfAddr, at: now} + a.sentFids.Add(1) + } + a.mu.Unlock() + } else if len(selfOwned) > 0 { + // No local directory wired (e.g. unit tests with nil). Re-queue + // so a future flush with a possibly-different HRW owner can + // send them. + deferred = append(deferred, selfOwned...) + } + if len(deferred) > 0 { + a.mu.Lock() + for _, fid := range deferred { + a.pending[fid] = struct{}{} + } + a.mu.Unlock() + } + + // Diagnostic V(2): shape of the current flush. Helps post-mortem + // debugging when an announce pipeline silently stops making progress + // (e.g. seed view stuck at self-only, owner unreachable, etc). + if len(byOwner) > 0 || len(selfOwned) > 0 || len(deferred) > 0 { + glog.V(2).Infof("peer-announce flush: toAnnounce=%d byOwner=%d selfOwned=%d deferred=%d", + len(toAnnounce), len(byOwner), len(selfOwned), len(deferred)) + } + + // Fan out one RPC per owner in parallel. A slow or unreachable + // owner blocks only its own goroutine; other owners' announces + // proceed immediately. + var wg sync.WaitGroup + for owner, fids := range byOwner { + wg.Add(1) + go func(owner string, fids []string) { + defer wg.Done() + a.sendTo(ctx, owner, fids, now) + }(owner, fids) + } + wg.Wait() +} + +func (a *PeerAnnouncer) sendTo(ctx context.Context, owner string, fids []string, now time.Time) { + client, closeFn, err := a.dialPeer(ctx, owner) + if err != nil { + a.flushErrs.Add(1) + // Requeue these fids: a future flush will retry. + a.requeue(fids) + glog.V(2).Infof("peer-announce dial %s: %v", owner, err) + return + } + defer closeFn() + + resp, err := client.ChunkAnnounce(ctx, &mount_peer_pb.ChunkAnnounceRequest{ + FileIds: fids, + PeerAddr: a.selfAddr, + DataCenter: a.selfDataCenter, + Rack: a.selfRack, + TtlSeconds: int32(a.announceTTL / time.Second), + }) + if err != nil { + a.flushErrs.Add(1) + a.requeue(fids) + glog.V(2).Infof("peer-announce %s: %v", owner, err) + return + } + + rejected := map[string]struct{}{} + for _, f := range resp.RejectedFileIds { + rejected[f] = struct{}{} + } + + a.mu.Lock() + for _, fid := range fids { + if _, skip := rejected[fid]; skip { + // Owner has changed or doesn't yet agree; retry later. + a.pending[fid] = struct{}{} + a.rejectedFids.Add(1) + continue + } + a.announcedAt[fid] = announceRecord{owner: owner, at: now} + a.sentFids.Add(1) + } + a.mu.Unlock() +} + +func (a *PeerAnnouncer) requeue(fids []string) { + a.mu.Lock() + for _, fid := range fids { + a.pending[fid] = struct{}{} + } + a.mu.Unlock() +} diff --git a/weed/mount/peer_announcer_test.go b/weed/mount/peer_announcer_test.go new file mode 100644 index 000000000..063c2b2f1 --- /dev/null +++ b/weed/mount/peer_announcer_test.go @@ -0,0 +1,374 @@ +package mount + +import ( + "context" + "sort" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb" + "google.golang.org/grpc" +) + +// fakeMountPeerClient records each ChunkAnnounce call; returns rejected +// fids from a configured set. +type fakeMountPeerClient struct { + mu sync.Mutex + mount_peer_pb.MountPeerClient + announcedBy map[string][]filerAnnouncement + rejectedByClient func(fid string) bool +} + +type filerAnnouncement struct { + fids []string + peerAddr string +} + +func (f *fakeMountPeerClient) ChunkAnnounce(ctx context.Context, req *mount_peer_pb.ChunkAnnounceRequest, opts ...grpc.CallOption) (*mount_peer_pb.ChunkAnnounceResponse, error) { + f.mu.Lock() + defer f.mu.Unlock() + // No owner key in the request; capture keyed by peer_addr (the announcer). + f.announcedBy[req.PeerAddr] = append(f.announcedBy[req.PeerAddr], filerAnnouncement{ + fids: append([]string(nil), req.FileIds...), + peerAddr: req.PeerAddr, + }) + resp := &mount_peer_pb.ChunkAnnounceResponse{} + if f.rejectedByClient != nil { + for _, fid := range req.FileIds { + if f.rejectedByClient(fid) { + resp.RejectedFileIds = append(resp.RejectedFileIds, fid) + } + } + } + return resp, nil +} + +// fakeDialer returns the same fakeMountPeerClient for every peer addr, +// recording which owner the announcer dialed. Close is a no-op. The +// dialed slice is protected by its own mutex because the announcer now +// fans RPCs out across goroutines — concurrent appends would race. +type dialRecorder struct { + mu sync.Mutex + dialled []string +} + +func (d *dialRecorder) record(addr string) { + d.mu.Lock() + d.dialled = append(d.dialled, addr) + d.mu.Unlock() +} + +func (d *dialRecorder) snapshot() []string { + d.mu.Lock() + defer d.mu.Unlock() + out := make([]string, len(d.dialled)) + copy(out, d.dialled) + return out +} + +func fakeDialer(fc *fakeMountPeerClient, rec *dialRecorder) MountPeerDialer { + return func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) { + rec.record(peerAddr) + return fc, func() {}, nil + } +} + +func TestPeerAnnouncer_FlushGroupsByOwner(t *testing.T) { + fc := &fakeMountPeerClient{ + announcedBy: map[string][]filerAnnouncement{}, + } + rec := &dialRecorder{} + + // Owner function: static assignment for test. + ownerFor := func(fid string) string { + if fid == "3,a" || fid == "3,c" { + return "owner-1:18081" + } + return "owner-2:18081" + } + + a := NewPeerAnnouncer("self:18080", "", "",ownerFor, fakeDialer(fc, rec), nil) + + a.EnqueueAnnounce("3,a") + a.EnqueueAnnounce("3,b") + a.EnqueueAnnounce("3,c") + + a.FlushForTest(context.Background()) + + dialed := rec.snapshot(); sort.Strings(dialed) + if len(dialed) != 2 || dialed[0] != "owner-1:18081" || dialed[1] != "owner-2:18081" { + t.Errorf("expected both owners dialed once, got %v", dialed) + } + + _, sent, _, _ := a.Stats() + if sent != 3 { + t.Errorf("expected 3 sent fids, got %d", sent) + } +} + +func TestPeerAnnouncer_SkipOwnerEqualsSelf(t *testing.T) { + fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}} + rec := &dialRecorder{} + + a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string { + return "self:18080" + }, fakeDialer(fc, rec), nil) + + a.EnqueueAnnounce("3,a") + a.FlushForTest(context.Background()) + + if d := rec.snapshot(); len(d) != 0 { + t.Errorf("expected no dial when owner == self, dialed: %v", d) + } + // Self-owned fids stay in pending so a subsequent flush re-checks + // them against a possibly-refreshed seed view. Without this retry a + // mount whose initial MountList hadn't yet observed its peers would + // silently drop every write-path announce on the floor. + a.mu.Lock() + _, stillPending := a.pending["3,a"] + a.mu.Unlock() + if !stillPending { + t.Errorf("self-owned fid should stay pending for retry") + } +} + +func TestPeerAnnouncer_RequeueOnRejection(t *testing.T) { + fc := &fakeMountPeerClient{ + announcedBy: map[string][]filerAnnouncement{}, + rejectedByClient: func(fid string) bool { return fid == "3,x" }, + } + rec := &dialRecorder{} + + a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string { + return "owner:18081" + }, fakeDialer(fc, rec), nil) + a.EnqueueAnnounce("3,x") + a.EnqueueAnnounce("3,y") + + a.FlushForTest(context.Background()) + + _, sent, rejected, _ := a.Stats() + if sent != 1 { + t.Errorf("expected 1 sent, got %d", sent) + } + if rejected != 1 { + t.Errorf("expected 1 rejected, got %d", rejected) + } + + // The rejected fid should be back in pending. + a.mu.Lock() + _, stillPending := a.pending["3,x"] + a.mu.Unlock() + if !stillPending { + t.Errorf("3,x should have been requeued after rejection") + } +} + +func TestPeerAnnouncer_TTLRenewal(t *testing.T) { + fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}} + rec := &dialRecorder{} + + now := time.Unix(1000, 0) + a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string { + return "owner:18081" + }, fakeDialer(fc, rec), nil) + a.clock = func() time.Time { return now } + a.announceInterval = 10 * time.Second + a.announceTTL = 60 * time.Second + + // Announce once. + a.EnqueueAnnounce("3,a") + a.FlushForTest(context.Background()) + if _, sent, _, _ := a.Stats(); sent != 1 { + t.Fatalf("want 1 sent on initial announce, got %d", sent) + } + + // Time passes to within the renewal threshold. + now = now.Add(45 * time.Second) + + // No new enqueue, but renewal should fire. + a.FlushForTest(context.Background()) + if _, sent, _, _ := a.Stats(); sent != 2 { + t.Errorf("want 2 sent after renewal, got %d", sent) + } +} + +// TestPeerAnnouncer_ReAnnouncesOnOwnerChange guards the fix for a +// startup race where the writer's seed view reaches self-knowledge of +// its peers in two stages: first just {self,A}, then {self,A,B}. HRW +// may pick different owners across those views, so the announce from +// stage 1 is sent to a mount that will NOT be the HRW owner by the +// time a reader's seed view is the {self,A,B} version. Without +// re-announcing on owner change, the fid silently ages out on the +// wrong directory and readers get "no peer holder." +func TestPeerAnnouncer_ReAnnouncesOnOwnerChange(t *testing.T) { + fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}} + rec := &dialRecorder{} + + ownerNow := "owner-1:18081" + a := NewPeerAnnouncer("self:18080", "", "", func(fid string) string { + return ownerNow + }, fakeDialer(fc, rec), nil) + + a.EnqueueAnnounce("3,x") + a.FlushForTest(context.Background()) + if _, sent, _, _ := a.Stats(); sent != 1 { + t.Fatalf("initial announce: want 1 sent, got %d", sent) + } + + // Seed view shifts; HRW now picks a different owner for the same + // fid. A fresh-timestamped entry in announcedAt should be treated + // as stale in owner terms and re-announced even though its age is + // well within the TTL renewal window. + ownerNow = "owner-2:18081" + a.FlushForTest(context.Background()) + if _, sent, _, _ := a.Stats(); sent != 2 { + t.Errorf("after owner change: want 2 sent, got %d", sent) + } + dialed := rec.snapshot() + if len(dialed) != 2 || dialed[0] == dialed[1] { + t.Errorf("want dial to both owners, got %v", dialed) + } +} + +// TestPeerAnnouncer_DropsEvictedFids guards the write→announce race: +// a chunk can be LRU-evicted between SetChunk and the next flush tick. +// Advertising an evicted fid just hands remote fetchers a NOT_FOUND +// from our FetchChunk server. With SetCachePresence wired, the +// announcer must drop evicted fids before dispatching the RPC and +// also clear them from announcedAt so they stop getting renewed. +func TestPeerAnnouncer_DropsEvictedFids(t *testing.T) { + fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}} + rec := &dialRecorder{} + present := map[string]bool{"3,here": true, "3,gone": false} + + a := NewPeerAnnouncer("self:18080", "", "", func(fid string) string { + return "owner:18081" + }, fakeDialer(fc, rec), nil) + a.SetCachePresence(func(fid string) bool { return present[fid] }) + + a.EnqueueAnnounce("3,here") + a.EnqueueAnnounce("3,gone") + a.FlushForTest(context.Background()) + + if _, sent, _, _ := a.Stats(); sent != 1 { + t.Errorf("only the still-cached fid should be announced, got sent=%d", sent) + } + fc.mu.Lock() + defer fc.mu.Unlock() + for _, an := range fc.announcedBy["self:18080"] { + for _, fid := range an.fids { + if fid == "3,gone" { + t.Errorf("evicted fid should not have been announced, saw %q", fid) + } + } + } +} + +// TestPeerAnnouncer_StopWaitsForFlush guards the shutdown race: Stop +// must not return while a tick-triggered flushOnce is still dispatching +// RPCs, otherwise the owning wfs can tear down the conn pool out from +// under the in-flight sendTo goroutines. +func TestPeerAnnouncer_StopWaitsForFlush(t *testing.T) { + started := make(chan struct{}) + unblock := make(chan struct{}) + slowDialer := func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) { + close(started) + select { + case <-unblock: + case <-ctx.Done(): + } + return nil, func() {}, context.DeadlineExceeded + } + + a := NewPeerAnnouncer("self:18080", "", "", func(string) string { + return "owner:18081" + }, slowDialer, nil) + + // Shortcut straight into a flush on a dedicated goroutine so we + // can assert Stop() blocks on it. + a.EnqueueAnnounce("3,slow") + flushDone := make(chan struct{}) + go func() { + a.FlushForTest(context.Background()) + close(flushDone) + }() + <-started + + // Prove Stop() returns only after the flush finishes by racing + // them: Stop() normally can't cancel a FlushForTest because + // we're not going through run(); but unblocking after stop's + // goroutine starts lets us confirm flushDone happens before + // stopDone never does under the old code. + stopDone := make(chan struct{}) + go func() { + a.Stop() + close(stopDone) + }() + close(unblock) + + select { + case <-flushDone: + case <-time.After(3 * time.Second): + t.Fatalf("flush did not complete within deadline") + } + select { + case <-stopDone: + case <-time.After(3 * time.Second): + t.Fatalf("Stop() did not return after flush completed") + } +} + +// TestPeerAnnouncer_SelfOwnedWritesToLocalDir guards the shortcut path +// for fids whose HRW owner resolves to self: no RPC should go out, but +// the local PeerDirectory must end up with a holder entry pointing at +// self — otherwise remote ChunkLookup callers would get an empty +// holder list for fids this mount owns directly. +func TestPeerAnnouncer_SelfOwnedWritesToLocalDir(t *testing.T) { + fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}} + rec := &dialRecorder{} + dir := NewPeerDirectory() + a := NewPeerAnnouncer("self:18080", "dc1", "rackA", func(fid string) string { + return "self:18080" + }, fakeDialer(fc, rec), dir) + + a.EnqueueAnnounce("3,selfowned") + a.FlushForTest(context.Background()) + + if d := rec.snapshot(); len(d) != 0 { + t.Errorf("self-owned fid must not trigger any dial, got %v", d) + } + holders := dir.Lookup([]string{"3,selfowned"}, nil).PeersByFid["3,selfowned"] + if len(holders) != 1 { + t.Fatalf("local dir should have 1 holder after self-owned flush, got %d", len(holders)) + } + if h := holders[0]; h.PeerAddr != "self:18080" || h.DataCenter != "dc1" || h.Rack != "rackA" { + t.Errorf("local dir entry wrong: %+v", h) + } + if _, sent, _, _ := a.Stats(); sent != 1 { + t.Errorf("sent counter should increment for self-owned too; got %d", sent) + } +} + +func TestPeerAnnouncer_DialerErrorRequeues(t *testing.T) { + errDialer := func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) { + return nil, func() {}, context.DeadlineExceeded + } + a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string { + return "owner:18081" + }, errDialer, nil) + a.EnqueueAnnounce("3,a") + a.FlushForTest(context.Background()) + + _, _, _, errs := a.Stats() + if errs != 1 { + t.Errorf("expected 1 flush error, got %d", errs) + } + a.mu.Lock() + _, pending := a.pending["3,a"] + a.mu.Unlock() + if !pending { + t.Errorf("fid should be requeued after dial error") + } +} diff --git a/weed/mount/peer_dialer.go b/weed/mount/peer_dialer.go new file mode 100644 index 000000000..499890a63 --- /dev/null +++ b/weed/mount/peer_dialer.go @@ -0,0 +1,137 @@ +package mount + +import ( + "context" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" +) + +// PeerConnPool caches one long-lived gRPC client connection per peer +// address. Both the announcer flush loop and the read-path fetcher hit +// the same handful of directory owners repeatedly; without a cache each +// call would pay TCP handshake + HTTP/2 preface cost (and on TLS, an +// additional handshake). The cache makes steady-state owner RPCs +// effectively free after the first call. +// +// Connections in terminal failure states (Shutdown) are transparently +// replaced on next access. Sizing: entries are ~1 KB + the conn itself; +// bounded at maxPeerConnPoolEntries to contain runaway growth. +type PeerConnPool struct { + dialOpts []grpc.DialOption + + mu sync.Mutex + conns map[string]*grpc.ClientConn +} + +// maxPeerConnPoolEntries caps live peer conns per mount. A 10k-mount +// fleet with HRW-sharded directory reaches only ~200 distinct owner +// addresses per mount in the worst case, so this is far above any real +// footprint while still bounding pathological growth. +const maxPeerConnPoolEntries = 4096 + +// NewPeerConnPool returns an empty pool. dialOpts should carry transport +// credentials matching the server side (production wires +// option.GrpcDialOption, which security.LoadClientTLS populates from +// security.toml). When no options are supplied we fall back to insecure +// cleartext — only safe for in-process tests. +func NewPeerConnPool(dialOpts ...grpc.DialOption) *PeerConnPool { + if len(dialOpts) == 0 { + dialOpts = []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + } + return &PeerConnPool{ + dialOpts: dialOpts, + conns: map[string]*grpc.ClientConn{}, + } +} + +// Dialer returns a MountPeerDialer bound to this pool. The returned +// closeFn is a no-op — the pool owns the connection lifecycle. Tests +// that want per-call dials can keep using the non-pooled variant. +func (p *PeerConnPool) Dialer() MountPeerDialer { + return func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) { + conn, err := p.get(peerAddr) + if err != nil { + return nil, func() {}, err + } + return mount_peer_pb.NewMountPeerClient(conn), func() {}, nil + } +} + +func (p *PeerConnPool) get(peerAddr string) (*grpc.ClientConn, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if conn, ok := p.conns[peerAddr]; ok { + if conn.GetState() != connectivity.Shutdown { + return conn, nil + } + // Pooled conn is unusable — drop it and redial below. + _ = conn.Close() + delete(p.conns, peerAddr) + } + if len(p.conns) >= maxPeerConnPoolEntries { + // Evict one arbitrary entry. Simple over LRU: the pool is small + // in practice, and the victim will be re-dialed if needed. + for k, c := range p.conns { + _ = c.Close() + delete(p.conns, k) + break + } + } + conn, err := grpc.NewClient(peerAddr, p.dialOpts...) + if err != nil { + return nil, err + } + p.conns[peerAddr] = conn + return conn, nil +} + +// Close tears down every cached connection. Safe to call multiple times. +func (p *PeerConnPool) Close() { + p.mu.Lock() + defer p.mu.Unlock() + for addr, c := range p.conns { + _ = c.Close() + delete(p.conns, addr) + } +} + +// Size returns the current number of cached connections — useful for +// tests and metrics exports. +func (p *PeerConnPool) Size() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.conns) +} + +// DefaultMountPeerDialer returns a per-call dialer (no pooling). Kept for +// tests and for any caller that genuinely wants a fresh connection per +// invocation. Production code should prefer PeerConnPool.Dialer(). +// +// Unused options silence dial-time lints when dialOpts is nil. +func DefaultMountPeerDialer(dialOpts ...grpc.DialOption) MountPeerDialer { + opts := append([]grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + }, dialOpts...) + return func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) { + conn, err := grpc.NewClient(peerAddr, opts...) + if err != nil { + return nil, func() {}, err + } + return mount_peer_pb.NewMountPeerClient(conn), func() { _ = conn.Close() }, nil + } +} + +// peerConnMaxAge lets external callers (or future metrics) decide when a +// pool entry is "stale" for monitoring purposes. The pool itself does not +// expire on age — grpc handles reconnect internally. +var peerConnMaxAge = 10 * time.Minute + +var _ = peerConnMaxAge // suppress unused-lint until a metric consumes it diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 1f365beb8..f305214f4 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -142,6 +142,8 @@ type WFS struct { peerRegistrar *PeerRegistrar peerDirectory *PeerDirectory peerGrpcServer *PeerGrpcServer + peerAnnouncer *PeerAnnouncer + peerConnPool *PeerConnPool peerDirectoryStop chan struct{} // closed on unmount to stop the sweeper goroutine FilerConf *filer.FilerConf filerClient *wdclient.FilerClient // Cached volume location client @@ -340,6 +342,12 @@ func NewSeaweedFileSystem(option *Option) *WFS { if wfs.rdmaClient != nil { wfs.rdmaClient.Close() } + if wfs.peerAnnouncer != nil { + wfs.peerAnnouncer.Stop() + } + if wfs.peerConnPool != nil { + wfs.peerConnPool.Close() + } if wfs.peerGrpcServer != nil { wfs.peerGrpcServer.Stop() } @@ -427,6 +435,32 @@ func NewSeaweedFileSystem(option *Option) *WFS { } else { wfs.peerDirectoryStop = make(chan struct{}) go wfs.runPeerDirectorySweeper(wfs.peerDirectoryStop) + + // Shared connection pool + announcer. Pool reuses one + // grpc.ClientConn per owner mount across both the + // announcer flush and the fetcher's ChunkLookup + + // FetchChunk calls. Transport credentials come from + // option.GrpcDialOption (security.LoadClientTLS), so + // peer dials match the TLS posture the server wants. + wfs.peerConnPool = NewPeerConnPool(option.GrpcDialOption) + wfs.peerAnnouncer = NewPeerAnnouncer( + selfAddr, + option.PeerDataCenter, + option.PeerRack, + wfs.peerRegistrar.OwnerFor, + wfs.peerConnPool.Dialer(), + wfs.peerDirectory, + ) + // Close the write→announce race: between SetChunk and + // the flush tick (up to 15 s) the cache can LRU-evict + // the chunk. Skip announcing fids we no longer hold. + if wfs.chunkCache != nil { + cache := wfs.chunkCache + wfs.peerAnnouncer.SetCachePresence(func(fid string) bool { + return cache.IsInCache(fid, true) + }) + } + wfs.peerAnnouncer.Start() } } } diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index 42d8c891d..552202ef5 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -57,9 +57,29 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun return nil, fmt.Errorf("upload result: %v", uploadResult.Error) } - if offset == 0 { + // When peer sharing is enabled we need EVERY chunk in the + // local cache so we can actually serve it back to peers on + // FetchChunk — otherwise the directory would advertise us as + // a holder and the fetcher would get NOT_FOUND from our + // chunk cache. When peer sharing is off we preserve the + // original behavior of caching only the first chunk (small + // files) to avoid blowing the cache on large uploads. Both + // paths gate on chunkCache != nil: -cacheCapacityMB=0 disables + // the cache entirely, in which case SetChunk would panic. + shouldCache := wfs.chunkCache != nil && (offset == 0 || wfs.peerAnnouncer != nil) + if shouldCache { wfs.chunkCache.SetChunk(fileId, data) } + // Announce every uploaded chunk so the tier-2 directory fills + // in as the file is written. Without this, the per-fetch + // announce path only bootstraps after someone else has already + // pulled a chunk via peer — which can't happen if nobody has + // told the directory who holds the chunk. Skip the announce + // when we couldn't cache (no point advertising bytes we can't + // actually serve back). + if wfs.peerAnnouncer != nil && shouldCache { + wfs.peerAnnouncer.EnqueueAnnounce(fileId) + } chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs) return chunk, nil