From ebc6c9b49806bff7f2339e322d72afdda5d99487 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 6 Feb 2024 08:57:30 -0800 Subject: [PATCH] Fix tracing send on closed channel (#18982) Depending on when the context cancelation is picked up the handler may return and close the channel before `SubscribeJSON` returns, causing: ``` Feb 05 17:12:00 s3-us-node11 minio[3973657]: panic: send on closed channel Feb 05 17:12:00 s3-us-node11 minio[3973657]: goroutine 378007076 [running]: Feb 05 17:12:00 s3-us-node11 minio[3973657]: github.com/minio/minio/internal/pubsub.(*PubSub[...]).SubscribeJSON.func1() Feb 05 17:12:00 s3-us-node11 minio[3973657]: github.com/minio/minio/internal/pubsub/pubsub.go:139 +0x12d Feb 05 17:12:00 s3-us-node11 minio[3973657]: created by github.com/minio/minio/internal/pubsub.(*PubSub[...]).SubscribeJSON in goroutine 378010884 Feb 05 17:12:00 s3-us-node11 minio[3973657]: github.com/minio/minio/internal/pubsub/pubsub.go:124 +0x352 ``` Wait explicitly for the goroutine to exit. Bonus: Listen for doneCh when sending to not risk getting blocked there is channel isn't being emptied. --- cmd/admin-handlers.go | 2 +- cmd/peer-rest-server.go | 9 ++++--- internal/pubsub/pubsub.go | 52 +++++++++++++++++++++++++-------------- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 9fe8c294d..1f9df67b1 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1935,7 +1935,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { peers, _ := newPeerRestClients(globalEndpoints) err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), traceCh, ctx.Done(), func(entry madmin.TraceInfo) bool { return shouldTrace(entry, traceOpts) - }) + }, nil) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 4ccc341d1..b0c42a7ce 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -30,6 +30,7 @@ import ( "net/url" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -999,12 +1000,13 @@ func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-c if err != nil { return grid.NewRemoteErr(err) } + var wg sync.WaitGroup // Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers. // Use buffered channel to take care of burst sends or slow w.Write() err = globalTrace.SubscribeJSON(traceOpts.TraceTypes(), out, ctx.Done(), func(entry madmin.TraceInfo) bool { return shouldTrace(entry, traceOpts) - }) + }, &wg) if err != nil { return grid.NewRemoteErr(err) } @@ -1013,8 +1015,9 @@ func (s *peerRESTServer) TraceHandler(ctx context.Context, payload []byte, _ <-c if traceOpts.TraceTypes().Contains(madmin.TraceBootstrap) { go globalBootstrapTracer.Publish(ctx, globalTrace) } - // Wait for remote to cancel. - <-ctx.Done() + + // Wait for remote to cancel and SubscribeJSON to exit. + wg.Wait() return nil } diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index 2b8d4a065..d6880089a 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -104,7 +104,7 @@ func (ps *PubSub[T, M]) Subscribe(mask M, subCh chan T, doneCh <-chan struct{}, } // SubscribeJSON - Adds a subscriber to pubsub system and returns results with JSON encoding. -func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan struct{}, filter func(entry T) bool) error { +func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan struct{}, filter func(entry T) bool, wg *sync.WaitGroup) error { totalSubs := atomic.AddInt32(&ps.numSubscribers, 1) if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers { atomic.AddInt32(&ps.numSubscribers, -1) @@ -120,40 +120,54 @@ func (ps *PubSub[T, M]) SubscribeJSON(mask M, subCh chan<- []byte, doneCh <-chan combined := Mask(atomic.LoadUint64(&ps.types)) combined.Merge(Mask(mask.Mask())) atomic.StoreUint64(&ps.types, uint64(combined)) - + if wg != nil { + wg.Add(1) + } go func() { + defer func() { + if wg != nil { + wg.Done() + } + // Clean up and de-register the subscriber + ps.Lock() + defer ps.Unlock() + var remainTypes Mask + for i, s := range ps.subs { + if s == sub { + ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) + } else { + remainTypes.Merge(s.types) + } + } + atomic.StoreUint64(&ps.types, uint64(remainTypes)) + atomic.AddInt32(&ps.numSubscribers, -1) + }() + + // Read from subChT and write to subCh var buf bytes.Buffer enc := json.NewEncoder(&buf) for { select { case <-doneCh: + return case v, ok := <-subChT: if !ok { - break + return } buf.Reset() err := enc.Encode(v) if err != nil { - break + return } - subCh <- append(GetByteBuffer()[:0], buf.Bytes()...) - continue - } - break - } - ps.Lock() - defer ps.Unlock() - var remainTypes Mask - for i, s := range ps.subs { - if s == sub { - ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) - } else { - remainTypes.Merge(s.types) + select { + case subCh <- append(GetByteBuffer()[:0], buf.Bytes()...): + continue + case <-doneCh: + return + } } } - atomic.StoreUint64(&ps.types, uint64(remainTypes)) - atomic.AddInt32(&ps.numSubscribers, -1) }() return nil