peer chunk sharing 6/8: announce queue + batched flush (#9135)

mount: batched announcer + pooled peer conns for mount-to-mount RPCs

* peer_announcer.go: non-blocking EnqueueAnnounce + ticker flush that
  groups fids by HRW owner, fans out one ChunkAnnounce per owner in
  parallel. announcedAt is pruned at 2× TTL so it stays bounded.

* peer_dialer.go: PeerConnPool caches one grpc.ClientConn per peer
  address; the announcer and (next PR) the fetcher share it so
  steady-state owner RPCs skip the handshake cost entirely. Bounded
  at 4096 cached entries; shutdown conns are transparently replaced.

* WFS starts both alongside the gRPC server; stops them on unmount.
This commit is contained in:
Chris Lu
2026-04-18 21:42:36 -07:00
committed by GitHub
parent fe9ca35bbd
commit 73f10fa528
5 changed files with 940 additions and 1 deletions

View File

@@ -0,0 +1,374 @@
package mount
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb"
)
// PeerAnnouncer batches and flushes ChunkAnnounce RPCs to the
// HRW-assigned owner mounts for each cached fid this mount holds.
//
// Shape:
// * EnqueueAnnounce(fid) is a non-blocking push into an in-memory set.
// * A background ticker flushes every announceInterval. Each flush
// drains the set, adds fids due for TTL renewal, groups by owner
// mount via HRW, and sends one ChunkAnnounce RPC per distinct owner.
// * A successful send records announced_at[fid]. A rejected (not-owner)
// fid is requeued so it will be retried after the seed view refreshes.
//
// See design-weed-mount-peer-chunk-sharing.md §4.4.
type PeerAnnouncer struct {
selfAddr string
selfDataCenter string
selfRack string
ownerFor func(fid string) string
dialPeer MountPeerDialer
localDir *PeerDirectory // populated directly for fids whose HRW owner == self
isCached func(fid string) bool // residency check; set nil to disable
announceInterval time.Duration
announceTTL time.Duration
mu sync.Mutex
pending map[string]struct{}
announcedAt map[string]announceRecord
stopCh chan struct{}
stopped atomic.Bool
runWg sync.WaitGroup // tracks the run goroutine so Stop can wait
// flushCancelMu guards flushCancel, which is the cancel fn for the
// currently in-flight flush (nil when idle). Stop takes the lock,
// cancels the flush to unblock its RPCs fast, then waits on runWg.
flushCancelMu sync.Mutex
flushCancel context.CancelFunc
flushes atomic.Int64
sentFids atomic.Int64
rejectedFids atomic.Int64
flushErrs atomic.Int64
clock func() time.Time
}
// MountPeerDialer opens a MountPeer gRPC client to a given peer. Tests
// inject a fake; production uses a real gRPC dial backed by a short
// connection cache.
type MountPeerDialer func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error)
// announceRecord is what we remember about a successful announce: who
// we told (the HRW owner at the time) and when. On each flush we
// recompute the HRW owner and, if it moved — typically because a new
// mount joined the seed list and the ring rebalanced — re-announce
// against the new owner. Without this, seed-view divergence during
// startup silently ages out fids on an owner that the reader never
// looks up.
type announceRecord struct {
owner string
at time.Time
}
// NewPeerAnnouncer constructs an announcer. Caller must call Start.
// selfDataCenter and selfRack are the locality labels attached to every
// ChunkAnnounce so the receiving directory records holders with DC+rack
// and the fetcher can later re-rank by locality.
//
// localDir is optional: when non-nil, any fid whose HRW owner resolves
// to self is written directly into localDir instead of sending a
// self→self RPC (which the receiver would reject anyway). This is the
// only way a fid the writer itself owns becomes visible to peers'
// ChunkLookup calls.
func NewPeerAnnouncer(selfAddr, selfDataCenter, selfRack string, ownerFor func(fid string) string, dial MountPeerDialer, localDir *PeerDirectory) *PeerAnnouncer {
return &PeerAnnouncer{
selfAddr: selfAddr,
selfDataCenter: selfDataCenter,
selfRack: selfRack,
ownerFor: ownerFor,
dialPeer: dial,
localDir: localDir,
announceInterval: 15 * time.Second,
announceTTL: 300 * time.Second,
pending: map[string]struct{}{},
announcedAt: map[string]announceRecord{},
stopCh: make(chan struct{}),
clock: time.Now,
}
}
// EnqueueAnnounce marks a fid as needing announcement. Non-blocking;
// dedupes within a flush window.
func (a *PeerAnnouncer) EnqueueAnnounce(fid string) {
a.mu.Lock()
a.pending[fid] = struct{}{}
a.mu.Unlock()
}
// SetCachePresence wires an optional residency check. When set, the
// announcer consults it right before dispatching a ChunkAnnounce RPC or
// writing a self-owned entry: if the cache no longer has the chunk
// (e.g. LRU-evicted under memory pressure between SetChunk and the
// next flush tick) we drop the announce rather than advertise bytes we
// can't serve back. nil disables the check.
func (a *PeerAnnouncer) SetCachePresence(isCached func(fid string) bool) {
a.mu.Lock()
defer a.mu.Unlock()
a.isCached = isCached
}
// Start launches the background flush loop.
func (a *PeerAnnouncer) Start() {
a.runWg.Add(1)
go func() {
defer a.runWg.Done()
a.run()
}()
}
// Stop halts the flush loop and waits for the current flush (if any) to
// finish. Safe to call multiple times. Callers that tear down shared
// dependencies (conn pool, local directory) can rely on no more
// sendTo/localDir.Announce goroutines being live after Stop returns.
func (a *PeerAnnouncer) Stop() {
if a.stopped.Swap(true) {
return
}
close(a.stopCh)
// Cancel the in-flight flush's context so its RPCs return quickly
// instead of running out the 10 s timeout.
a.flushCancelMu.Lock()
if a.flushCancel != nil {
a.flushCancel()
}
a.flushCancelMu.Unlock()
a.runWg.Wait()
}
// Stats exposes counters for observability.
func (a *PeerAnnouncer) Stats() (flushes, sentFids, rejectedFids, flushErrs int64) {
return a.flushes.Load(), a.sentFids.Load(), a.rejectedFids.Load(), a.flushErrs.Load()
}
// announceFlushTimeout caps how long a single flush cycle can run. A slow
// or unresponsive owner peer would otherwise block a flush goroutine
// indefinitely, and since flushes fan out per owner in parallel a
// pileup would bleed into subsequent ticks and grow unbounded. The
// bound is set a bit under one flush interval so the next tick's flush
// doesn't chase the previous one's tail.
const announceFlushTimeout = 10 * time.Second
func (a *PeerAnnouncer) run() {
t := time.NewTicker(a.announceInterval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
ctx, cancel := context.WithTimeout(context.Background(), announceFlushTimeout)
a.flushCancelMu.Lock()
a.flushCancel = cancel
a.flushCancelMu.Unlock()
a.flushOnce(ctx)
a.flushCancelMu.Lock()
a.flushCancel = nil
a.flushCancelMu.Unlock()
cancel()
}
}
}
// flushOnce is exported via a wrapper for tests.
func (a *PeerAnnouncer) FlushForTest(ctx context.Context) {
a.flushOnce(ctx)
}
func (a *PeerAnnouncer) flushOnce(ctx context.Context) {
a.flushes.Add(1)
now := a.clock()
a.mu.Lock()
// Swap pending in O(1) instead of copying.
toAnnounce := a.pending
a.pending = map[string]struct{}{}
// Add renewals: (1) fids we announced long enough ago that they'll
// expire on the owner side before our next flush tick; (2) fids
// whose HRW owner has changed since we last announced them —
// usually because a new mount joined the seed list and the ring
// rebalanced. Also prune announcedAt entries that are so old
// (> 2× TTL) that re-announcing them would be pointless.
renewThreshold := now.Add(-a.announceTTL + 2*a.announceInterval)
staleCutoff := now.Add(-2 * a.announceTTL)
for fid, rec := range a.announcedAt {
switch {
case rec.at.Before(staleCutoff):
delete(a.announcedAt, fid)
case rec.at.Before(renewThreshold):
toAnnounce[fid] = struct{}{}
default:
// Same fid, fresh timestamp — but owner may have moved.
// Force a re-announce if HRW says a different mount is
// the owner now.
if a.ownerFor != nil {
if cur := a.ownerFor(fid); cur != "" && cur != rec.owner {
toAnnounce[fid] = struct{}{}
}
}
}
}
a.mu.Unlock()
if len(toAnnounce) == 0 {
return
}
// Drop fids the cache no longer has. The write path hooks into
// SetChunk + EnqueueAnnounce, but the chunk can be LRU-evicted in
// the window between that and the next 15 s flush. Advertising a
// chunk we can't actually serve just sends remote fetchers to our
// FetchChunk, which then returns NOT_FOUND — a wasted round trip.
// Also drop the announcedAt record so we don't keep renewing
// evicted fids forever.
a.mu.Lock()
present := a.isCached
a.mu.Unlock()
if present != nil {
for fid := range toAnnounce {
if !present(fid) {
delete(toAnnounce, fid)
a.mu.Lock()
delete(a.announcedAt, fid)
a.mu.Unlock()
}
}
if len(toAnnounce) == 0 {
return
}
}
// Classify each fid. Three buckets:
// byOwner — HRW owner is a remote mount; send a ChunkAnnounce RPC.
// selfOwned — HRW owner is us; write directly into our local
// directory (no RPC — we ARE the directory for this
// fid). Without this, fids we wrote AND HRW-own stay
// invisible to peer ChunkLookup forever.
// deferred — HRW owner unknown (empty seed view). Re-queue for
// the next flush once listOnce has populated seeds.
byOwner := map[string][]string{}
var selfOwned []string
var deferred []string
for fid := range toAnnounce {
owner := ""
if a.ownerFor != nil {
owner = a.ownerFor(fid)
}
switch owner {
case "":
deferred = append(deferred, fid)
case a.selfAddr:
selfOwned = append(selfOwned, fid)
default:
byOwner[owner] = append(byOwner[owner], fid)
}
}
if len(selfOwned) > 0 && a.localDir != nil {
a.localDir.Announce(a.selfAddr, a.selfDataCenter, a.selfRack, selfOwned, a.announceTTL, nil)
a.mu.Lock()
for _, fid := range selfOwned {
a.announcedAt[fid] = announceRecord{owner: a.selfAddr, at: now}
a.sentFids.Add(1)
}
a.mu.Unlock()
} else if len(selfOwned) > 0 {
// No local directory wired (e.g. unit tests with nil). Re-queue
// so a future flush with a possibly-different HRW owner can
// send them.
deferred = append(deferred, selfOwned...)
}
if len(deferred) > 0 {
a.mu.Lock()
for _, fid := range deferred {
a.pending[fid] = struct{}{}
}
a.mu.Unlock()
}
// Diagnostic V(2): shape of the current flush. Helps post-mortem
// debugging when an announce pipeline silently stops making progress
// (e.g. seed view stuck at self-only, owner unreachable, etc).
if len(byOwner) > 0 || len(selfOwned) > 0 || len(deferred) > 0 {
glog.V(2).Infof("peer-announce flush: toAnnounce=%d byOwner=%d selfOwned=%d deferred=%d",
len(toAnnounce), len(byOwner), len(selfOwned), len(deferred))
}
// Fan out one RPC per owner in parallel. A slow or unreachable
// owner blocks only its own goroutine; other owners' announces
// proceed immediately.
var wg sync.WaitGroup
for owner, fids := range byOwner {
wg.Add(1)
go func(owner string, fids []string) {
defer wg.Done()
a.sendTo(ctx, owner, fids, now)
}(owner, fids)
}
wg.Wait()
}
func (a *PeerAnnouncer) sendTo(ctx context.Context, owner string, fids []string, now time.Time) {
client, closeFn, err := a.dialPeer(ctx, owner)
if err != nil {
a.flushErrs.Add(1)
// Requeue these fids: a future flush will retry.
a.requeue(fids)
glog.V(2).Infof("peer-announce dial %s: %v", owner, err)
return
}
defer closeFn()
resp, err := client.ChunkAnnounce(ctx, &mount_peer_pb.ChunkAnnounceRequest{
FileIds: fids,
PeerAddr: a.selfAddr,
DataCenter: a.selfDataCenter,
Rack: a.selfRack,
TtlSeconds: int32(a.announceTTL / time.Second),
})
if err != nil {
a.flushErrs.Add(1)
a.requeue(fids)
glog.V(2).Infof("peer-announce %s: %v", owner, err)
return
}
rejected := map[string]struct{}{}
for _, f := range resp.RejectedFileIds {
rejected[f] = struct{}{}
}
a.mu.Lock()
for _, fid := range fids {
if _, skip := rejected[fid]; skip {
// Owner has changed or doesn't yet agree; retry later.
a.pending[fid] = struct{}{}
a.rejectedFids.Add(1)
continue
}
a.announcedAt[fid] = announceRecord{owner: owner, at: now}
a.sentFids.Add(1)
}
a.mu.Unlock()
}
func (a *PeerAnnouncer) requeue(fids []string) {
a.mu.Lock()
for _, fid := range fids {
a.pending[fid] = struct{}{}
}
a.mu.Unlock()
}

View File

@@ -0,0 +1,374 @@
package mount
import (
"context"
"sort"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb"
"google.golang.org/grpc"
)
// fakeMountPeerClient records each ChunkAnnounce call; returns rejected
// fids from a configured set.
type fakeMountPeerClient struct {
mu sync.Mutex
mount_peer_pb.MountPeerClient
announcedBy map[string][]filerAnnouncement
rejectedByClient func(fid string) bool
}
type filerAnnouncement struct {
fids []string
peerAddr string
}
func (f *fakeMountPeerClient) ChunkAnnounce(ctx context.Context, req *mount_peer_pb.ChunkAnnounceRequest, opts ...grpc.CallOption) (*mount_peer_pb.ChunkAnnounceResponse, error) {
f.mu.Lock()
defer f.mu.Unlock()
// No owner key in the request; capture keyed by peer_addr (the announcer).
f.announcedBy[req.PeerAddr] = append(f.announcedBy[req.PeerAddr], filerAnnouncement{
fids: append([]string(nil), req.FileIds...),
peerAddr: req.PeerAddr,
})
resp := &mount_peer_pb.ChunkAnnounceResponse{}
if f.rejectedByClient != nil {
for _, fid := range req.FileIds {
if f.rejectedByClient(fid) {
resp.RejectedFileIds = append(resp.RejectedFileIds, fid)
}
}
}
return resp, nil
}
// fakeDialer returns the same fakeMountPeerClient for every peer addr,
// recording which owner the announcer dialed. Close is a no-op. The
// dialed slice is protected by its own mutex because the announcer now
// fans RPCs out across goroutines — concurrent appends would race.
type dialRecorder struct {
mu sync.Mutex
dialled []string
}
func (d *dialRecorder) record(addr string) {
d.mu.Lock()
d.dialled = append(d.dialled, addr)
d.mu.Unlock()
}
func (d *dialRecorder) snapshot() []string {
d.mu.Lock()
defer d.mu.Unlock()
out := make([]string, len(d.dialled))
copy(out, d.dialled)
return out
}
func fakeDialer(fc *fakeMountPeerClient, rec *dialRecorder) MountPeerDialer {
return func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) {
rec.record(peerAddr)
return fc, func() {}, nil
}
}
func TestPeerAnnouncer_FlushGroupsByOwner(t *testing.T) {
fc := &fakeMountPeerClient{
announcedBy: map[string][]filerAnnouncement{},
}
rec := &dialRecorder{}
// Owner function: static assignment for test.
ownerFor := func(fid string) string {
if fid == "3,a" || fid == "3,c" {
return "owner-1:18081"
}
return "owner-2:18081"
}
a := NewPeerAnnouncer("self:18080", "", "",ownerFor, fakeDialer(fc, rec), nil)
a.EnqueueAnnounce("3,a")
a.EnqueueAnnounce("3,b")
a.EnqueueAnnounce("3,c")
a.FlushForTest(context.Background())
dialed := rec.snapshot(); sort.Strings(dialed)
if len(dialed) != 2 || dialed[0] != "owner-1:18081" || dialed[1] != "owner-2:18081" {
t.Errorf("expected both owners dialed once, got %v", dialed)
}
_, sent, _, _ := a.Stats()
if sent != 3 {
t.Errorf("expected 3 sent fids, got %d", sent)
}
}
func TestPeerAnnouncer_SkipOwnerEqualsSelf(t *testing.T) {
fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}}
rec := &dialRecorder{}
a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string {
return "self:18080"
}, fakeDialer(fc, rec), nil)
a.EnqueueAnnounce("3,a")
a.FlushForTest(context.Background())
if d := rec.snapshot(); len(d) != 0 {
t.Errorf("expected no dial when owner == self, dialed: %v", d)
}
// Self-owned fids stay in pending so a subsequent flush re-checks
// them against a possibly-refreshed seed view. Without this retry a
// mount whose initial MountList hadn't yet observed its peers would
// silently drop every write-path announce on the floor.
a.mu.Lock()
_, stillPending := a.pending["3,a"]
a.mu.Unlock()
if !stillPending {
t.Errorf("self-owned fid should stay pending for retry")
}
}
func TestPeerAnnouncer_RequeueOnRejection(t *testing.T) {
fc := &fakeMountPeerClient{
announcedBy: map[string][]filerAnnouncement{},
rejectedByClient: func(fid string) bool { return fid == "3,x" },
}
rec := &dialRecorder{}
a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string {
return "owner:18081"
}, fakeDialer(fc, rec), nil)
a.EnqueueAnnounce("3,x")
a.EnqueueAnnounce("3,y")
a.FlushForTest(context.Background())
_, sent, rejected, _ := a.Stats()
if sent != 1 {
t.Errorf("expected 1 sent, got %d", sent)
}
if rejected != 1 {
t.Errorf("expected 1 rejected, got %d", rejected)
}
// The rejected fid should be back in pending.
a.mu.Lock()
_, stillPending := a.pending["3,x"]
a.mu.Unlock()
if !stillPending {
t.Errorf("3,x should have been requeued after rejection")
}
}
func TestPeerAnnouncer_TTLRenewal(t *testing.T) {
fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}}
rec := &dialRecorder{}
now := time.Unix(1000, 0)
a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string {
return "owner:18081"
}, fakeDialer(fc, rec), nil)
a.clock = func() time.Time { return now }
a.announceInterval = 10 * time.Second
a.announceTTL = 60 * time.Second
// Announce once.
a.EnqueueAnnounce("3,a")
a.FlushForTest(context.Background())
if _, sent, _, _ := a.Stats(); sent != 1 {
t.Fatalf("want 1 sent on initial announce, got %d", sent)
}
// Time passes to within the renewal threshold.
now = now.Add(45 * time.Second)
// No new enqueue, but renewal should fire.
a.FlushForTest(context.Background())
if _, sent, _, _ := a.Stats(); sent != 2 {
t.Errorf("want 2 sent after renewal, got %d", sent)
}
}
// TestPeerAnnouncer_ReAnnouncesOnOwnerChange guards the fix for a
// startup race where the writer's seed view reaches self-knowledge of
// its peers in two stages: first just {self,A}, then {self,A,B}. HRW
// may pick different owners across those views, so the announce from
// stage 1 is sent to a mount that will NOT be the HRW owner by the
// time a reader's seed view is the {self,A,B} version. Without
// re-announcing on owner change, the fid silently ages out on the
// wrong directory and readers get "no peer holder."
func TestPeerAnnouncer_ReAnnouncesOnOwnerChange(t *testing.T) {
fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}}
rec := &dialRecorder{}
ownerNow := "owner-1:18081"
a := NewPeerAnnouncer("self:18080", "", "", func(fid string) string {
return ownerNow
}, fakeDialer(fc, rec), nil)
a.EnqueueAnnounce("3,x")
a.FlushForTest(context.Background())
if _, sent, _, _ := a.Stats(); sent != 1 {
t.Fatalf("initial announce: want 1 sent, got %d", sent)
}
// Seed view shifts; HRW now picks a different owner for the same
// fid. A fresh-timestamped entry in announcedAt should be treated
// as stale in owner terms and re-announced even though its age is
// well within the TTL renewal window.
ownerNow = "owner-2:18081"
a.FlushForTest(context.Background())
if _, sent, _, _ := a.Stats(); sent != 2 {
t.Errorf("after owner change: want 2 sent, got %d", sent)
}
dialed := rec.snapshot()
if len(dialed) != 2 || dialed[0] == dialed[1] {
t.Errorf("want dial to both owners, got %v", dialed)
}
}
// TestPeerAnnouncer_DropsEvictedFids guards the write→announce race:
// a chunk can be LRU-evicted between SetChunk and the next flush tick.
// Advertising an evicted fid just hands remote fetchers a NOT_FOUND
// from our FetchChunk server. With SetCachePresence wired, the
// announcer must drop evicted fids before dispatching the RPC and
// also clear them from announcedAt so they stop getting renewed.
func TestPeerAnnouncer_DropsEvictedFids(t *testing.T) {
fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}}
rec := &dialRecorder{}
present := map[string]bool{"3,here": true, "3,gone": false}
a := NewPeerAnnouncer("self:18080", "", "", func(fid string) string {
return "owner:18081"
}, fakeDialer(fc, rec), nil)
a.SetCachePresence(func(fid string) bool { return present[fid] })
a.EnqueueAnnounce("3,here")
a.EnqueueAnnounce("3,gone")
a.FlushForTest(context.Background())
if _, sent, _, _ := a.Stats(); sent != 1 {
t.Errorf("only the still-cached fid should be announced, got sent=%d", sent)
}
fc.mu.Lock()
defer fc.mu.Unlock()
for _, an := range fc.announcedBy["self:18080"] {
for _, fid := range an.fids {
if fid == "3,gone" {
t.Errorf("evicted fid should not have been announced, saw %q", fid)
}
}
}
}
// TestPeerAnnouncer_StopWaitsForFlush guards the shutdown race: Stop
// must not return while a tick-triggered flushOnce is still dispatching
// RPCs, otherwise the owning wfs can tear down the conn pool out from
// under the in-flight sendTo goroutines.
func TestPeerAnnouncer_StopWaitsForFlush(t *testing.T) {
started := make(chan struct{})
unblock := make(chan struct{})
slowDialer := func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) {
close(started)
select {
case <-unblock:
case <-ctx.Done():
}
return nil, func() {}, context.DeadlineExceeded
}
a := NewPeerAnnouncer("self:18080", "", "", func(string) string {
return "owner:18081"
}, slowDialer, nil)
// Shortcut straight into a flush on a dedicated goroutine so we
// can assert Stop() blocks on it.
a.EnqueueAnnounce("3,slow")
flushDone := make(chan struct{})
go func() {
a.FlushForTest(context.Background())
close(flushDone)
}()
<-started
// Prove Stop() returns only after the flush finishes by racing
// them: Stop() normally can't cancel a FlushForTest because
// we're not going through run(); but unblocking after stop's
// goroutine starts lets us confirm flushDone happens before
// stopDone never does under the old code.
stopDone := make(chan struct{})
go func() {
a.Stop()
close(stopDone)
}()
close(unblock)
select {
case <-flushDone:
case <-time.After(3 * time.Second):
t.Fatalf("flush did not complete within deadline")
}
select {
case <-stopDone:
case <-time.After(3 * time.Second):
t.Fatalf("Stop() did not return after flush completed")
}
}
// TestPeerAnnouncer_SelfOwnedWritesToLocalDir guards the shortcut path
// for fids whose HRW owner resolves to self: no RPC should go out, but
// the local PeerDirectory must end up with a holder entry pointing at
// self — otherwise remote ChunkLookup callers would get an empty
// holder list for fids this mount owns directly.
func TestPeerAnnouncer_SelfOwnedWritesToLocalDir(t *testing.T) {
fc := &fakeMountPeerClient{announcedBy: map[string][]filerAnnouncement{}}
rec := &dialRecorder{}
dir := NewPeerDirectory()
a := NewPeerAnnouncer("self:18080", "dc1", "rackA", func(fid string) string {
return "self:18080"
}, fakeDialer(fc, rec), dir)
a.EnqueueAnnounce("3,selfowned")
a.FlushForTest(context.Background())
if d := rec.snapshot(); len(d) != 0 {
t.Errorf("self-owned fid must not trigger any dial, got %v", d)
}
holders := dir.Lookup([]string{"3,selfowned"}, nil).PeersByFid["3,selfowned"]
if len(holders) != 1 {
t.Fatalf("local dir should have 1 holder after self-owned flush, got %d", len(holders))
}
if h := holders[0]; h.PeerAddr != "self:18080" || h.DataCenter != "dc1" || h.Rack != "rackA" {
t.Errorf("local dir entry wrong: %+v", h)
}
if _, sent, _, _ := a.Stats(); sent != 1 {
t.Errorf("sent counter should increment for self-owned too; got %d", sent)
}
}
func TestPeerAnnouncer_DialerErrorRequeues(t *testing.T) {
errDialer := func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) {
return nil, func() {}, context.DeadlineExceeded
}
a := NewPeerAnnouncer("self:18080", "", "",func(fid string) string {
return "owner:18081"
}, errDialer, nil)
a.EnqueueAnnounce("3,a")
a.FlushForTest(context.Background())
_, _, _, errs := a.Stats()
if errs != 1 {
t.Errorf("expected 1 flush error, got %d", errs)
}
a.mu.Lock()
_, pending := a.pending["3,a"]
a.mu.Unlock()
if !pending {
t.Errorf("fid should be requeued after dial error")
}
}

137
weed/mount/peer_dialer.go Normal file
View File

@@ -0,0 +1,137 @@
package mount
import (
"context"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
)
// PeerConnPool caches one long-lived gRPC client connection per peer
// address. Both the announcer flush loop and the read-path fetcher hit
// the same handful of directory owners repeatedly; without a cache each
// call would pay TCP handshake + HTTP/2 preface cost (and on TLS, an
// additional handshake). The cache makes steady-state owner RPCs
// effectively free after the first call.
//
// Connections in terminal failure states (Shutdown) are transparently
// replaced on next access. Sizing: entries are ~1 KB + the conn itself;
// bounded at maxPeerConnPoolEntries to contain runaway growth.
type PeerConnPool struct {
dialOpts []grpc.DialOption
mu sync.Mutex
conns map[string]*grpc.ClientConn
}
// maxPeerConnPoolEntries caps live peer conns per mount. A 10k-mount
// fleet with HRW-sharded directory reaches only ~200 distinct owner
// addresses per mount in the worst case, so this is far above any real
// footprint while still bounding pathological growth.
const maxPeerConnPoolEntries = 4096
// NewPeerConnPool returns an empty pool. dialOpts should carry transport
// credentials matching the server side (production wires
// option.GrpcDialOption, which security.LoadClientTLS populates from
// security.toml). When no options are supplied we fall back to insecure
// cleartext — only safe for in-process tests.
func NewPeerConnPool(dialOpts ...grpc.DialOption) *PeerConnPool {
if len(dialOpts) == 0 {
dialOpts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}
return &PeerConnPool{
dialOpts: dialOpts,
conns: map[string]*grpc.ClientConn{},
}
}
// Dialer returns a MountPeerDialer bound to this pool. The returned
// closeFn is a no-op — the pool owns the connection lifecycle. Tests
// that want per-call dials can keep using the non-pooled variant.
func (p *PeerConnPool) Dialer() MountPeerDialer {
return func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) {
conn, err := p.get(peerAddr)
if err != nil {
return nil, func() {}, err
}
return mount_peer_pb.NewMountPeerClient(conn), func() {}, nil
}
}
func (p *PeerConnPool) get(peerAddr string) (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if conn, ok := p.conns[peerAddr]; ok {
if conn.GetState() != connectivity.Shutdown {
return conn, nil
}
// Pooled conn is unusable — drop it and redial below.
_ = conn.Close()
delete(p.conns, peerAddr)
}
if len(p.conns) >= maxPeerConnPoolEntries {
// Evict one arbitrary entry. Simple over LRU: the pool is small
// in practice, and the victim will be re-dialed if needed.
for k, c := range p.conns {
_ = c.Close()
delete(p.conns, k)
break
}
}
conn, err := grpc.NewClient(peerAddr, p.dialOpts...)
if err != nil {
return nil, err
}
p.conns[peerAddr] = conn
return conn, nil
}
// Close tears down every cached connection. Safe to call multiple times.
func (p *PeerConnPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for addr, c := range p.conns {
_ = c.Close()
delete(p.conns, addr)
}
}
// Size returns the current number of cached connections — useful for
// tests and metrics exports.
func (p *PeerConnPool) Size() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.conns)
}
// DefaultMountPeerDialer returns a per-call dialer (no pooling). Kept for
// tests and for any caller that genuinely wants a fresh connection per
// invocation. Production code should prefer PeerConnPool.Dialer().
//
// Unused options silence dial-time lints when dialOpts is nil.
func DefaultMountPeerDialer(dialOpts ...grpc.DialOption) MountPeerDialer {
opts := append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}, dialOpts...)
return func(ctx context.Context, peerAddr string) (mount_peer_pb.MountPeerClient, func(), error) {
conn, err := grpc.NewClient(peerAddr, opts...)
if err != nil {
return nil, func() {}, err
}
return mount_peer_pb.NewMountPeerClient(conn), func() { _ = conn.Close() }, nil
}
}
// peerConnMaxAge lets external callers (or future metrics) decide when a
// pool entry is "stale" for monitoring purposes. The pool itself does not
// expire on age — grpc handles reconnect internally.
var peerConnMaxAge = 10 * time.Minute
var _ = peerConnMaxAge // suppress unused-lint until a metric consumes it

View File

@@ -142,6 +142,8 @@ type WFS struct {
peerRegistrar *PeerRegistrar
peerDirectory *PeerDirectory
peerGrpcServer *PeerGrpcServer
peerAnnouncer *PeerAnnouncer
peerConnPool *PeerConnPool
peerDirectoryStop chan struct{} // closed on unmount to stop the sweeper goroutine
FilerConf *filer.FilerConf
filerClient *wdclient.FilerClient // Cached volume location client
@@ -340,6 +342,12 @@ func NewSeaweedFileSystem(option *Option) *WFS {
if wfs.rdmaClient != nil {
wfs.rdmaClient.Close()
}
if wfs.peerAnnouncer != nil {
wfs.peerAnnouncer.Stop()
}
if wfs.peerConnPool != nil {
wfs.peerConnPool.Close()
}
if wfs.peerGrpcServer != nil {
wfs.peerGrpcServer.Stop()
}
@@ -427,6 +435,32 @@ func NewSeaweedFileSystem(option *Option) *WFS {
} else {
wfs.peerDirectoryStop = make(chan struct{})
go wfs.runPeerDirectorySweeper(wfs.peerDirectoryStop)
// Shared connection pool + announcer. Pool reuses one
// grpc.ClientConn per owner mount across both the
// announcer flush and the fetcher's ChunkLookup +
// FetchChunk calls. Transport credentials come from
// option.GrpcDialOption (security.LoadClientTLS), so
// peer dials match the TLS posture the server wants.
wfs.peerConnPool = NewPeerConnPool(option.GrpcDialOption)
wfs.peerAnnouncer = NewPeerAnnouncer(
selfAddr,
option.PeerDataCenter,
option.PeerRack,
wfs.peerRegistrar.OwnerFor,
wfs.peerConnPool.Dialer(),
wfs.peerDirectory,
)
// Close the write→announce race: between SetChunk and
// the flush tick (up to 15 s) the cache can LRU-evict
// the chunk. Skip announcing fids we no longer hold.
if wfs.chunkCache != nil {
cache := wfs.chunkCache
wfs.peerAnnouncer.SetCachePresence(func(fid string) bool {
return cache.IsInCache(fid, true)
})
}
wfs.peerAnnouncer.Start()
}
}
}

View File

@@ -57,9 +57,29 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
if offset == 0 {
// When peer sharing is enabled we need EVERY chunk in the
// local cache so we can actually serve it back to peers on
// FetchChunk — otherwise the directory would advertise us as
// a holder and the fetcher would get NOT_FOUND from our
// chunk cache. When peer sharing is off we preserve the
// original behavior of caching only the first chunk (small
// files) to avoid blowing the cache on large uploads. Both
// paths gate on chunkCache != nil: -cacheCapacityMB=0 disables
// the cache entirely, in which case SetChunk would panic.
shouldCache := wfs.chunkCache != nil && (offset == 0 || wfs.peerAnnouncer != nil)
if shouldCache {
wfs.chunkCache.SetChunk(fileId, data)
}
// Announce every uploaded chunk so the tier-2 directory fills
// in as the file is written. Without this, the per-fetch
// announce path only bootstraps after someone else has already
// pulled a chunk via peer — which can't happen if nobody has
// told the directory who holds the chunk. Skip the announce
// when we couldn't cache (no point advertising bytes we can't
// actually serve back).
if wfs.peerAnnouncer != nil && shouldCache {
wfs.peerAnnouncer.EnqueueAnnounce(fileId)
}
chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs)
return chunk, nil