From 0bd563b2e512e7454b5a3035b0e6dd723243e58a Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Mon, 6 May 2024 23:12:31 +0100 Subject: [PATCH] Fix a leak in WS object browser (#3325) ``` goroutine 7399330769 [chan send, 70126 minutes]: github.com/minio/console/api.(*wsMinioClient).objectManager.func2.1() github.com/minio/console@v0.46.0/api/ws_objects.go:135 +0x6f0 created by github.com/minio/console/api.(*wsMinioClient).objectManager.func2 in goroutine 7354918912 github.com/minio/console@v0.46.0/api/ws_objects.go:95 +0x45e ``` --- api/ws_objects.go | 94 ++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/api/ws_objects.go b/api/ws_objects.go index aa9208373..507e46931 100644 --- a/api/ws_objects.go +++ b/api/ws_objects.go @@ -43,13 +43,26 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { writeChannel := make(chan WSResponse) done := make(chan interface{}) + sendWSResponse := func(r WSResponse) { + select { + case writeChannel <- r: + case <-done: + } + } + // Read goroutine go func() { + defer close(writeChannel) for { + select { + case <-done: + return + default: + } + mType, message, err := wsc.conn.readMessage() if err != nil { LogInfo("Error while reading objectManager message", err) - close(done) return } @@ -60,8 +73,6 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { err := json.Unmarshal(message, &messageRequest) if err != nil { LogInfo("Error on message request unmarshal") - - close(done) return } @@ -74,7 +85,6 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { const itemsPerBatch = 1000 switch messageRequest.Mode { case "close": - close(done) return case "cancel": // if we have that request id, cancel it @@ -97,12 +107,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { if err != nil { LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error())) - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Error: ErrorWithContext(ctx, err), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, - } + }) return } @@ -112,12 +122,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { return } if lsObj.Err != nil { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Error: ErrorWithContext(ctx, lsObj.Err), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, - } + }) continue } @@ -132,24 +142,24 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { buffer = append(buffer, objItem) if len(buffer) >= itemsPerBatch { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Data: buffer, - } + }) buffer = nil } } if len(buffer) > 0 { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Data: buffer, - } + }) } - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, RequestEnd: true, - } + }) // remove the cancellation context delete(cancelContexts, messageRequest.RequestID) @@ -168,12 +178,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest) if err != nil { LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error())) - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Error: ErrorWithContext(ctx, err), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, - } + }) return } @@ -182,12 +192,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { s3Client, err := newS3BucketClient(session, objectRqConfigs.BucketName, objectRqConfigs.Prefix, clientIP) if err != nil { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Error: ErrorWithContext(ctx, err), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, - } + }) cancel() return @@ -199,12 +209,12 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { for lsObj := range startRewindListing(ctx, mcS3C, objectRqConfigs) { if lsObj.Err != nil { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Error: ErrorWithContext(ctx, lsObj.Err.ToGoError()), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, - } + }) continue } @@ -222,25 +232,25 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { buffer = append(buffer, objItem) if len(buffer) >= itemsPerBatch { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Data: buffer, - } + }) buffer = nil } } if len(buffer) > 0 { - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Data: buffer, - } + }) } - writeChannel <- WSResponse{ + sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, RequestEnd: true, - } + }) // remove the cancellation context delete(cancelContexts, messageRequest.RequestID) @@ -250,27 +260,19 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { } }() - // Write goroutine - go func() { - for { - select { - case <-done: - return - case writeM := <-writeChannel: - jsonData, err := json.Marshal(writeM) - if err != nil { - LogInfo("Error while marshaling the response", err) - return - } + defer close(done) - err = wsc.conn.writeMessage(websocket.TextMessage, jsonData) - if err != nil { - LogInfo("Error while writing the message", err) - return - } - } + for writeM := range writeChannel { + jsonData, err := json.Marshal(writeM) + if err != nil { + LogInfo("Error while marshaling the response", err) + return } - }() - <-done + err = wsc.conn.writeMessage(websocket.TextMessage, jsonData) + if err != nil { + LogInfo("Error while writing the message", err) + return + } + } }