From 0db34e4b85ff710ee3faf060bb1c477bc7da8327 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Tue, 18 Apr 2023 16:11:30 +0100 Subject: [PATCH] Listen bucket events to send empty events with new line (#17037) --- cmd/listen-notification-handlers.go | 34 ++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/cmd/listen-notification-handlers.go b/cmd/listen-notification-handlers.go index 7c978a609..6752393dc 100644 --- a/cmd/listen-notification-handlers.go +++ b/cmd/listen-notification-handlers.go @@ -20,6 +20,7 @@ package cmd import ( "encoding/json" "net/http" + "strconv" "time" "github.com/minio/minio/internal/event" @@ -141,8 +142,30 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r peer.Listen(listenCh, ctx.Done(), values) } - keepAliveTicker := time.NewTicker(500 * time.Millisecond) - defer keepAliveTicker.Stop() + var ( + emptyEventTicker <-chan time.Time + keepAliveTicker <-chan time.Time + ) + + if p := values.Get("ping"); p != "" { + pingInterval, err := strconv.Atoi(p) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidQueryParams), r.URL) + return + } + if pingInterval < 1 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidQueryParams), r.URL) + return + } + t := time.NewTicker(time.Duration(pingInterval) * time.Second) + defer t.Stop() + emptyEventTicker = t.C + } else { + // Deprecated Apr 2023 + t := time.NewTicker(500 * time.Millisecond) + defer t.Stop() + keepAliveTicker = t.C + } enc := json.NewEncoder(w) for { @@ -155,7 +178,12 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r // Flush if nothing is queued w.(http.Flusher).Flush() } - case <-keepAliveTicker.C: + case <-emptyEventTicker: + if err := enc.Encode(struct{ Records []event.Event }{}); err != nil { + return + } + w.(http.Flusher).Flush() + case <-keepAliveTicker: if _, err := w.Write([]byte(" ")); err != nil { return }