Fix websocket apis (#127)
Remove ping check and instead use a context that will be canceled if it the client sends a close message or an error occurs on reading. The context will be used to cancel all functions using it.
This commit is contained in:
@@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -32,97 +31,46 @@ import (
|
||||
const logTimeFormat string = "15:04:05 MST 01/02/2006"
|
||||
|
||||
// startConsoleLog starts log of the servers
|
||||
// by first setting a websocket reader that will
|
||||
// check for a heartbeat.
|
||||
//
|
||||
// A WaitGroup is used to handle goroutines and to ensure
|
||||
// all finish in the proper order. If any, sendConsoleLogInfo()
|
||||
// or wsReadCheck() returns, trace should end.
|
||||
func startConsoleLog(conn WSConn, client MinioAdmin) (mError error) {
|
||||
// a WaitGroup waits for a collection of goroutines to finish
|
||||
wg := sync.WaitGroup{}
|
||||
// a cancel context is needed to end all goroutines used
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
func startConsoleLog(ctx context.Context, conn WSConn, client MinioAdmin) error {
|
||||
// TODO: accept parameters as variables
|
||||
// name of node, default = "" (all)
|
||||
node := ""
|
||||
// number of log lines
|
||||
lineCount := 100
|
||||
// type of logs "minio"|"application"|"all" default = "all"
|
||||
logKind := "all"
|
||||
// Start listening on all Console Log activity.
|
||||
logCh := client.getLogs(ctx, node, lineCount, logKind)
|
||||
|
||||
// Set number of goroutines to wait. wg.Wait()
|
||||
// waitsuntil counter is zero (all are done)
|
||||
wg.Add(3)
|
||||
// start go routine for reading websocket heartbeat
|
||||
readErr := wsReadCheck(ctx, &wg, conn)
|
||||
// send Stream of Console Log Info to the ws c.connection
|
||||
logCh := sendConsoleLogInfo(ctx, &wg, conn, client)
|
||||
// If wsReadCheck returns it means that it is not possible to check
|
||||
// ws heartbeat anymore so we stop from doing Console Log, cancel context
|
||||
// for all goroutines.
|
||||
go func(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
if err := <-readErr; err != nil {
|
||||
log.Println("error on wsReadCheck:", err)
|
||||
mError = err
|
||||
}
|
||||
// cancel context for all goroutines.
|
||||
cancel()
|
||||
}(&wg)
|
||||
|
||||
// get logCh err on finish
|
||||
if err := <-logCh; err != nil {
|
||||
mError = err
|
||||
}
|
||||
|
||||
// if logCh closes for any reason,
|
||||
// cancel context for all goroutines
|
||||
cancel()
|
||||
// wait all goroutines to finish
|
||||
wg.Wait()
|
||||
return mError
|
||||
}
|
||||
|
||||
// sendlogInfo sends stream of Console Log Info to the ws connection
|
||||
func sendConsoleLogInfo(ctx context.Context, wg *sync.WaitGroup, conn WSConn, client MinioAdmin) <-chan error {
|
||||
// decrements the WaitGroup counter
|
||||
// by one when the function returns
|
||||
defer wg.Done()
|
||||
ch := make(chan error)
|
||||
go func(ch chan<- error) {
|
||||
defer close(ch)
|
||||
|
||||
// TODO: accept parameters as variables
|
||||
// name of node, default = "" (all)
|
||||
node := ""
|
||||
// number of log lines
|
||||
lineCount := 100
|
||||
// type of logs "minio"|"application"|"all" default = "all"
|
||||
logKind := "all"
|
||||
// Start listening on all Console Log activity.
|
||||
logCh := client.getLogs(ctx, node, lineCount, logKind)
|
||||
|
||||
for logInfo := range logCh {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case logInfo, ok := <-logCh:
|
||||
// zero value returned because the channel is closed and empty
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if logInfo.Err != nil {
|
||||
log.Println("error on console logs:", logInfo.Err)
|
||||
ch <- logInfo.Err
|
||||
return
|
||||
return logInfo.Err
|
||||
}
|
||||
|
||||
// Serialize message to be sent
|
||||
bytes, err := json.Marshal(serializeConsoleLogInfo(&logInfo))
|
||||
if err != nil {
|
||||
fmt.Println("error on json.Marshal:", err)
|
||||
ch <- err
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Send Message through websocket connection
|
||||
err = conn.writeMessage(websocket.TextMessage, bytes)
|
||||
if err != nil {
|
||||
log.Println("error writeMessage:", err)
|
||||
ch <- err
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
}(ch)
|
||||
|
||||
return ch
|
||||
}
|
||||
}
|
||||
|
||||
func serializeConsoleLogInfo(l *madmin.LogInfo) (logInfo madmin.LogInfo) {
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@@ -39,11 +38,13 @@ func TestAdminConsoleLog(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
adminClient := adminClientMock{}
|
||||
mockWSConn := mockConn{}
|
||||
function := "startConsoleLog()"
|
||||
function := "startConsoleLog(ctx, )"
|
||||
ctx := context.Background()
|
||||
|
||||
testReceiver := make(chan madmin.LogInfo, 5)
|
||||
textToReceive := "test message"
|
||||
testStreamSize := 5
|
||||
isClosed := false // testReceiver is closed?
|
||||
|
||||
// Test-1: Serve Console with no errors until Console finishes sending
|
||||
// define mock function behavior for minio server Console
|
||||
@@ -63,10 +64,6 @@ func TestAdminConsoleLog(t *testing.T) {
|
||||
}(ch)
|
||||
return ch
|
||||
}
|
||||
// mock function of conn.ReadMessage(), no error on read
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, nil
|
||||
}
|
||||
writesCount := 1
|
||||
// mock connection WriteMessage() no error
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
@@ -74,15 +71,17 @@ func TestAdminConsoleLog(t *testing.T) {
|
||||
var t madmin.LogInfo
|
||||
_ = json.Unmarshal(data, &t)
|
||||
if writesCount == testStreamSize {
|
||||
// for testing we need to close the receiver channel
|
||||
close(testReceiver)
|
||||
if !isClosed {
|
||||
close(testReceiver)
|
||||
isClosed = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
testReceiver <- t
|
||||
writesCount++
|
||||
return nil
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
|
||||
if err := startConsoleLog(ctx, mockWSConn, adminClient); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
// check that the TestReceiver got the same number of data from Console.
|
||||
@@ -94,50 +93,11 @@ func TestAdminConsoleLog(t *testing.T) {
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return fmt.Errorf("error on write")
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
|
||||
if err := startConsoleLog(ctx, mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("error on write", err.Error())
|
||||
}
|
||||
|
||||
// Test-3: error happens while reading, unexpected Close Error should return error.
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
|
||||
}
|
||||
|
||||
// Test-4: error happens while reading, expected Close Error NormalClosure
|
||||
// expected Close Error should not return an error, just end Console.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
|
||||
// Test-5: error happens while reading, expected Close Error CloseGoingAway
|
||||
// expected Close Error should not return an error, just return.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
|
||||
// Test-6: error happens while reading, non Close Error Type should be returned as
|
||||
// error
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, fmt.Errorf("error on read")
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("error on read", err.Error())
|
||||
}
|
||||
|
||||
// Test-7: error happens on GetLogs Minio, Console should stop
|
||||
// Test-3: error happens on GetLogs Minio, Console should stop
|
||||
// and error shall be returned.
|
||||
minioGetLogsMock = func(ctx context.Context, node string, lineCnt int, logKind string) <-chan madmin.LogInfo {
|
||||
ch := make(chan madmin.LogInfo)
|
||||
@@ -156,12 +116,10 @@ func TestAdminConsoleLog(t *testing.T) {
|
||||
}(ch)
|
||||
return ch
|
||||
}
|
||||
// mock function of conn.ReadMessage(), no error on read, should stay unless
|
||||
// context is done.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, nil
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
|
||||
if err := startConsoleLog(ctx, mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("error on Console", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
@@ -50,93 +49,40 @@ type callStats struct {
|
||||
}
|
||||
|
||||
// startTraceInfo starts trace of the servers
|
||||
// by first setting a websocket reader that will
|
||||
// check for a heartbeat.
|
||||
//
|
||||
// A WaitGroup is used to handle goroutines and to ensure
|
||||
// all finish in the proper order. If any, sendTraceInfo()
|
||||
// or wsReadCheck() returns, trace should end.
|
||||
func startTraceInfo(conn WSConn, client MinioAdmin) (mError error) {
|
||||
// a WaitGroup waits for a collection of goroutines to finish
|
||||
wg := sync.WaitGroup{}
|
||||
// a cancel context is needed to end all goroutines used
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Set number of goroutines to wait. wg.Wait()
|
||||
// waitsuntil counter is zero (all are done)
|
||||
wg.Add(3)
|
||||
// start go routine for reading websocket heartbeat
|
||||
readErr := wsReadCheck(ctx, &wg, conn)
|
||||
// send Stream of Trace Info to the ws c.connection
|
||||
traceCh := sendTraceInfo(ctx, &wg, conn, client)
|
||||
// If wsReadCheck returns it means that it is not possible to check
|
||||
// ws heartbeat anymore so we stop from doing trace, cancel context
|
||||
// for all goroutines.
|
||||
go func(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
if err := <-readErr; err != nil {
|
||||
log.Println("error on wsReadCheck:", err)
|
||||
mError = err
|
||||
}
|
||||
// cancel context for all goroutines.
|
||||
cancel()
|
||||
}(&wg)
|
||||
|
||||
// get traceCh error on finish
|
||||
if err := <-traceCh; err != nil {
|
||||
mError = err
|
||||
}
|
||||
|
||||
// if traceCh closes for any reason,
|
||||
// cancel context for all goroutines
|
||||
cancel()
|
||||
// wait all goroutines to finish
|
||||
wg.Wait()
|
||||
return mError
|
||||
}
|
||||
|
||||
// sendTraceInfo sends stream of Trace Info to the ws connection
|
||||
func sendTraceInfo(ctx context.Context, wg *sync.WaitGroup, conn WSConn, client MinioAdmin) <-chan error {
|
||||
// decrements the WaitGroup counter
|
||||
// by one when the function returns
|
||||
defer wg.Done()
|
||||
ch := make(chan error)
|
||||
go func(ch chan<- error) {
|
||||
defer close(ch)
|
||||
|
||||
// trace all traffic
|
||||
allTraffic := true
|
||||
// Trace failed requests only
|
||||
errOnly := false
|
||||
// Start listening on all trace activity.
|
||||
traceCh := client.serviceTrace(ctx, allTraffic, errOnly)
|
||||
|
||||
for traceInfo := range traceCh {
|
||||
func startTraceInfo(ctx context.Context, conn WSConn, client MinioAdmin) error {
|
||||
// trace all traffic
|
||||
allTraffic := true
|
||||
// Trace failed requests only
|
||||
errOnly := false
|
||||
// Start listening on all trace activity.
|
||||
traceCh := client.serviceTrace(ctx, allTraffic, errOnly)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case traceInfo, ok := <-traceCh:
|
||||
// zero value returned because the channel is closed and empty
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if traceInfo.Err != nil {
|
||||
log.Println("error on serviceTrace:", traceInfo.Err)
|
||||
ch <- traceInfo.Err
|
||||
return
|
||||
return traceInfo.Err
|
||||
}
|
||||
// Serialize message to be sent
|
||||
traceInfoBytes, err := json.Marshal(shortTrace(&traceInfo))
|
||||
if err != nil {
|
||||
fmt.Println("error on json.Marshal:", err)
|
||||
ch <- err
|
||||
return
|
||||
return err
|
||||
}
|
||||
// Send Message through websocket connection
|
||||
err = conn.writeMessage(websocket.TextMessage, traceInfoBytes)
|
||||
if err != nil {
|
||||
log.Println("error writeMessage:", err)
|
||||
ch <- err
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
// TODO: verbose
|
||||
}(ch)
|
||||
|
||||
return ch
|
||||
}
|
||||
}
|
||||
|
||||
// shortTrace creates a shorter Trace Info message.
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
trace "github.com/minio/minio/pkg/trace"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -40,11 +39,13 @@ func TestAdminTrace(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
adminClient := adminClientMock{}
|
||||
mockWSConn := mockConn{}
|
||||
function := "startTraceInfo()"
|
||||
function := "startTraceInfo(ctx, )"
|
||||
ctx := context.Background()
|
||||
|
||||
testReceiver := make(chan shortTraceMsg, 5)
|
||||
textToReceive := "test"
|
||||
testStreamSize := 5
|
||||
isClosed := false // testReceiver is closed?
|
||||
|
||||
// Test-1: Serve Trace with no errors until trace finishes sending
|
||||
// define mock function behavior for minio server Trace
|
||||
@@ -64,10 +65,6 @@ func TestAdminTrace(t *testing.T) {
|
||||
}(ch)
|
||||
return ch
|
||||
}
|
||||
// mock function of conn.ReadMessage(), no error on read
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, nil
|
||||
}
|
||||
writesCount := 1
|
||||
// mock connection WriteMessage() no error
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
@@ -76,14 +73,17 @@ func TestAdminTrace(t *testing.T) {
|
||||
_ = json.Unmarshal(data, &t)
|
||||
if writesCount == testStreamSize {
|
||||
// for testing we need to close the receiver channel
|
||||
close(testReceiver)
|
||||
if !isClosed {
|
||||
close(testReceiver)
|
||||
isClosed = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
testReceiver <- t
|
||||
writesCount++
|
||||
return nil
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); err != nil {
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
// check that the TestReceiver got the same number of data from trace.
|
||||
@@ -95,50 +95,11 @@ func TestAdminTrace(t *testing.T) {
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return fmt.Errorf("error on write")
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("error on write", err.Error())
|
||||
}
|
||||
|
||||
// Test-3: error happens while reading, unexpected Close Error should return error.
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
|
||||
}
|
||||
|
||||
// Test-4: error happens while reading, expected Close Error NormalClosure
|
||||
// expected Close Error should not return an error, just end trace.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
|
||||
// Test-5: error happens while reading, expected Close Error CloseGoingAway
|
||||
// expected Close Error should not return an error, just return.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
|
||||
// Test-6: error happens while reading, non Close Error Type should be returned as
|
||||
// error
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, fmt.Errorf("error on read")
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("error on read", err.Error())
|
||||
}
|
||||
|
||||
// Test-7: error happens on serviceTrace Minio, trace should stop
|
||||
// Test-3: error happens on serviceTrace Minio, trace should stop
|
||||
// and error shall be returned.
|
||||
minioServiceTraceMock = func(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
ch := make(chan madmin.ServiceTraceInfo)
|
||||
@@ -157,12 +118,10 @@ func TestAdminTrace(t *testing.T) {
|
||||
}(ch)
|
||||
return ch
|
||||
}
|
||||
// mock function of conn.ReadMessage(), no error on read, should stay unless
|
||||
// context is done.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, nil
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient); assert.Error(err) {
|
||||
assert.Equal("error on trace", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
mc "github.com/minio/mc/cmd"
|
||||
@@ -35,106 +34,48 @@ type watchOptions struct {
|
||||
mc.WatchOptions
|
||||
}
|
||||
|
||||
// startWatch starts by setting a websocket reader that
|
||||
// will check for a heartbeat.
|
||||
//
|
||||
// A WaitGroup is used to handle goroutines and to ensure
|
||||
// all finish in the proper order. If any, sendWatchInfo()
|
||||
// or wsReadCheck() returns, watch should end.
|
||||
func startWatch(conn WSConn, client MCS3Client, options watchOptions) (mError error) {
|
||||
// a WaitGroup waits for a collection of goroutines to finish
|
||||
wg := sync.WaitGroup{}
|
||||
// a cancel context is needed to end all goroutines used
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Set number of goroutines to wait. wg.Wait()
|
||||
// waits until counter is zero (all are done)
|
||||
wg.Add(3)
|
||||
// start go routine for reading websocket heartbeat
|
||||
readErr := wsReadCheck(ctx, &wg, conn)
|
||||
// send Stream of watch events to the ws c.connection
|
||||
ch := sendWatchInfo(ctx, &wg, conn, client, options)
|
||||
// If wsReadCheck returns it means that it is not possible to check
|
||||
// ws heartbeat anymore so we stop from doing Watch, cancel context
|
||||
// for all goroutines.
|
||||
go func(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
if err := <-readErr; err != nil {
|
||||
log.Println("error on wsReadCheck:", err)
|
||||
mError = err
|
||||
}
|
||||
// cancel context for all goroutines.
|
||||
cancel()
|
||||
}(&wg)
|
||||
|
||||
if err := <-ch; err != nil {
|
||||
mError = err
|
||||
func startWatch(ctx context.Context, conn WSConn, wsc MCS3Client, options watchOptions) error {
|
||||
wo, pErr := wsc.watch(options.WatchOptions)
|
||||
if pErr != nil {
|
||||
fmt.Println("error initializing watch:", pErr.Cause)
|
||||
return pErr.Cause
|
||||
}
|
||||
|
||||
// if ch closes for any reason,
|
||||
// cancel context for all goroutines
|
||||
cancel()
|
||||
// wait all goroutines to finish
|
||||
wg.Wait()
|
||||
return mError
|
||||
}
|
||||
|
||||
// sendWatchInfo sends stream of Watch Event to the ws connection
|
||||
func sendWatchInfo(ctx context.Context, wg *sync.WaitGroup, conn WSConn, wsc MCS3Client, options watchOptions) <-chan error {
|
||||
// decrements the WaitGroup counter
|
||||
// by one when the function returns
|
||||
defer wg.Done()
|
||||
ch := make(chan error)
|
||||
go func(ch chan<- error) {
|
||||
defer close(ch)
|
||||
wo, pErr := wsc.watch(options.WatchOptions)
|
||||
if pErr != nil {
|
||||
fmt.Println("error initializing watch:", pErr.Cause)
|
||||
ch <- pErr.Cause
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(wo.DoneChan)
|
||||
return
|
||||
case events, ok := <-wo.Events():
|
||||
// zero value returned because the channel is closed and empty
|
||||
if !ok {
|
||||
return
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(wo.DoneChan)
|
||||
return nil
|
||||
case events, ok := <-wo.Events():
|
||||
// zero value returned because the channel is closed and empty
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
for _, event := range events {
|
||||
// Serialize message to be sent
|
||||
bytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
log.Println("error on json.Marshal:", err)
|
||||
return err
|
||||
}
|
||||
for _, event := range events {
|
||||
// Serialize message to be sent
|
||||
bytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
fmt.Println("error on json.Marshal:", err)
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
// Send Message through websocket connection
|
||||
err = conn.writeMessage(websocket.TextMessage, bytes)
|
||||
if err != nil {
|
||||
log.Println("error writeMessage:", err)
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
case pErr, ok := <-wo.Errors():
|
||||
// zero value returned because the channel is closed and empty
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if pErr != nil {
|
||||
log.Println("error on watch:", pErr.Cause)
|
||||
ch <- pErr.Cause
|
||||
return
|
||||
|
||||
// Send Message through websocket connection
|
||||
err = conn.writeMessage(websocket.TextMessage, bytes)
|
||||
if err != nil {
|
||||
log.Println("error writeMessage:", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
case pErr, ok := <-wo.Errors():
|
||||
// zero value returned because the channel is closed and empty
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if pErr != nil {
|
||||
log.Println("error on watch:", pErr.Cause)
|
||||
return pErr.Cause
|
||||
|
||||
}
|
||||
}
|
||||
}(ch)
|
||||
return ch
|
||||
}
|
||||
}
|
||||
|
||||
// getOptionsFromReq gets bucket name, events, prefix, suffix from a websocket
|
||||
|
||||
@@ -17,13 +17,13 @@
|
||||
package restapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
mc "github.com/minio/mc/cmd"
|
||||
"github.com/minio/mc/pkg/probe"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -42,11 +42,13 @@ func TestWatch(t *testing.T) {
|
||||
|
||||
client := s3ClientMock{}
|
||||
mockWSConn := mockConn{}
|
||||
ctx := context.Background()
|
||||
|
||||
function := "startWatch()"
|
||||
function := "startWatch(ctx, )"
|
||||
|
||||
testStreamSize := 5
|
||||
testReceiver := make(chan []mc.EventInfo, testStreamSize)
|
||||
isClosed := false // testReceiver is closed?
|
||||
textToReceive := "test message"
|
||||
testOptions := watchOptions{}
|
||||
testOptions.BucketName = "bucktest"
|
||||
@@ -77,10 +79,6 @@ func TestWatch(t *testing.T) {
|
||||
}(wo)
|
||||
return wo, nil
|
||||
}
|
||||
// mock function of conn.ReadMessage(), no error on read
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, nil
|
||||
}
|
||||
writesCount := 1
|
||||
// mock connection WriteMessage() no error
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
@@ -89,14 +87,17 @@ func TestWatch(t *testing.T) {
|
||||
_ = json.Unmarshal(data, &t)
|
||||
if writesCount == testStreamSize {
|
||||
// for testing we need to close the receiver channel
|
||||
close(testReceiver)
|
||||
if !isClosed {
|
||||
close(testReceiver)
|
||||
isClosed = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
testReceiver <- t
|
||||
writesCount++
|
||||
return nil
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); err != nil {
|
||||
if err := startWatch(ctx, mockWSConn, client, testOptions); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
// check that the TestReceiver got the same number of data from Console.
|
||||
@@ -110,50 +111,11 @@ func TestWatch(t *testing.T) {
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return fmt.Errorf("error on write")
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
|
||||
if err := startWatch(ctx, mockWSConn, client, testOptions); assert.Error(err) {
|
||||
assert.Equal("error on write", err.Error())
|
||||
}
|
||||
|
||||
// Test-3: error happens while reading, unexpected Close Error should return error.
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
|
||||
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
|
||||
}
|
||||
|
||||
// Test-4: error happens while reading, expected Close Error NormalClosure
|
||||
// expected Close Error should not return an error, just end Console.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
|
||||
// Test-5: error happens while reading, expected Close Error CloseGoingAway
|
||||
// expected Close Error should not return an error, just return.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
|
||||
// Test-6: error happens while reading, non Close Error Type should be returned as
|
||||
// error
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, fmt.Errorf("error on read")
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
|
||||
assert.Equal("error on read", err.Error())
|
||||
}
|
||||
|
||||
// Test-7: error happens on Watch, watch should stop
|
||||
// Test-3: error happens on Watch, watch should stop
|
||||
// and error shall be returned.
|
||||
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
|
||||
wo := &mc.WatchObject{
|
||||
@@ -178,25 +140,23 @@ func TestWatch(t *testing.T) {
|
||||
}(wo)
|
||||
return wo, nil
|
||||
}
|
||||
// mock function of conn.ReadMessage(), no error on read, should stay unless
|
||||
// context is done.
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, nil
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
|
||||
if err := startWatch(ctx, mockWSConn, client, testOptions); assert.Error(err) {
|
||||
assert.Equal("error on Watch", err.Error())
|
||||
}
|
||||
|
||||
// Test-8: error happens on Watch, watch should stop
|
||||
// Test-4: error happens on Watch, watch should stop
|
||||
// and error shall be returned.
|
||||
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
|
||||
return nil, &probe.Error{Cause: fmt.Errorf("error on Watch")}
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
|
||||
if err := startWatch(ctx, mockWSConn, client, testOptions); assert.Error(err) {
|
||||
assert.Equal("error on Watch", err.Error())
|
||||
}
|
||||
|
||||
// Test-9: return nil on error on Watch
|
||||
// Test-5: return nil on error on Watch
|
||||
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
|
||||
wo := &mc.WatchObject{
|
||||
EventInfoChan: make(chan []mc.EventInfo),
|
||||
@@ -221,7 +181,7 @@ func TestWatch(t *testing.T) {
|
||||
}(wo)
|
||||
return wo, nil
|
||||
}
|
||||
if err := startWatch(mockWSConn, client, testOptions); err != nil {
|
||||
if err := startWatch(ctx, mockWSConn, client, testOptions); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
// check that the TestReceiver got the same number of data from Console.
|
||||
@@ -231,7 +191,7 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test-9: getOptionsFromReq return parameters from path
|
||||
// Test-6: getOptionsFromReq return parameters from path
|
||||
u, err := url.Parse("http://localhost/api/v1/watch/bucket1?prefix=&suffix=.jpg&events=put,get")
|
||||
if err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", "url.Parse()", err.Error())
|
||||
@@ -251,7 +211,7 @@ func TestWatch(t *testing.T) {
|
||||
assert.Equal(expectedOptions.Suffix, opts.Suffix)
|
||||
assert.Equal(expectedOptions.Events, opts.Events)
|
||||
|
||||
// Test-9: getOptionsFromReq return default events if not defined
|
||||
// Test-7: getOptionsFromReq return default events if not defined
|
||||
u, err = url.Parse("http://localhost/api/v1/watch/bucket1?prefix=&suffix=.jpg&events=")
|
||||
if err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", "url.Parse()", err.Error())
|
||||
@@ -271,7 +231,7 @@ func TestWatch(t *testing.T) {
|
||||
assert.Equal(expectedOptions.Suffix, opts.Suffix)
|
||||
assert.Equal(expectedOptions.Events, opts.Events)
|
||||
|
||||
// Test-10: getOptionsFromReq return default events if not defined
|
||||
// Test-8: getOptionsFromReq return default events if not defined
|
||||
u, err = url.Parse("http://localhost/api/v1/watch/bucket2?prefix=&suffix=")
|
||||
if err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", "url.Parse()", err.Error())
|
||||
|
||||
@@ -22,8 +22,6 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/errors"
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -39,12 +37,6 @@ var upgrader = websocket.Upgrader{
|
||||
const (
|
||||
// websocket base path
|
||||
wsBasePath = "/ws"
|
||||
|
||||
// Time allowed to read the next pong message from the peer.
|
||||
pingWait = 15 * time.Second
|
||||
|
||||
// Maximum message size allowed from peer. 0 = unlimited
|
||||
maxMessageSize = 512
|
||||
)
|
||||
|
||||
// MCSWebsocketAdmin interface of a Websocket Client
|
||||
@@ -78,9 +70,6 @@ type wsS3Client struct {
|
||||
type WSConn interface {
|
||||
writeMessage(messageType int, data []byte) error
|
||||
close() error
|
||||
setReadLimit(limit int64)
|
||||
setReadDeadline(t time.Time) error
|
||||
setPongHandler(h func(appData string) error)
|
||||
readMessage() (messageType int, p []byte, err error)
|
||||
}
|
||||
|
||||
@@ -99,17 +88,6 @@ func (c wsConn) close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c wsConn) setReadLimit(limit int64) {
|
||||
c.conn.SetReadLimit(limit)
|
||||
}
|
||||
|
||||
func (c wsConn) setReadDeadline(t time.Time) error {
|
||||
return c.conn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (c wsConn) setPongHandler(h func(appData string) error) {
|
||||
c.conn.SetPongHandler(h)
|
||||
}
|
||||
func (c wsConn) readMessage() (messageType int, p []byte, err error) {
|
||||
return c.conn.ReadMessage()
|
||||
}
|
||||
@@ -218,54 +196,35 @@ func newWebSocketS3Client(conn *websocket.Conn, jwt, bucketName string) (*wsS3Cl
|
||||
return wsS3Client, nil
|
||||
}
|
||||
|
||||
// wsReadCheck ensures that the client is sending a message
|
||||
// every `pingWait` seconds. If deadline exceeded or an error
|
||||
// happened this will return, meaning it won't be able to ensure
|
||||
// client heartbeat.
|
||||
func wsReadCheck(ctx context.Context, wg *sync.WaitGroup, conn WSConn) chan error {
|
||||
// decrements the WaitGroup counter by one when the function returns
|
||||
defer wg.Done()
|
||||
ch := make(chan error)
|
||||
go func(ch chan error) {
|
||||
defer close(ch)
|
||||
|
||||
// set initial Limits and deadlines for the Reader
|
||||
conn.setReadLimit(maxMessageSize)
|
||||
conn.setReadDeadline(time.Now().Add(pingWait))
|
||||
conn.setPongHandler(func(string) error { conn.setReadDeadline(time.Now().Add(pingWait)); return nil })
|
||||
|
||||
// wsReadClientCtx reads the messages that come from the client
|
||||
// if the client sends a Close Message the context will be
|
||||
// canceled. If the connection is closed the goroutine inside
|
||||
// will return.
|
||||
func wsReadClientCtx(conn WSConn) context.Context {
|
||||
// a cancel context is needed to end all goroutines used
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("context done inside wsReadCheck")
|
||||
return
|
||||
default:
|
||||
_, _, err := conn.readMessage()
|
||||
if err != nil {
|
||||
// if error of type websocket.CloseError and is Unexpected
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||
log.Println("error unexpected CloseError on ReadMessage:", err)
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
// Not all errors are of type websocket.CloseError.
|
||||
// If not of type websocket.CloseError return error
|
||||
if _, ok := err.(*websocket.CloseError); !ok {
|
||||
log.Println("error on ReadMessage:", err)
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
// else is an expected Close Error
|
||||
log.Println("closed conn.ReadMessage:", err)
|
||||
_, _, err := conn.readMessage()
|
||||
if err != nil {
|
||||
// if error of type websocket.CloseError and is Unexpected
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||
log.Println("error unexpected CloseError on ReadMessage:", err)
|
||||
return
|
||||
}
|
||||
// Reset Read Deadline after each Read
|
||||
conn.setReadDeadline(time.Now().Add(pingWait))
|
||||
conn.setPongHandler(func(string) error { conn.setReadDeadline(time.Now().Add(pingWait)); return nil })
|
||||
// Not all errors are of type websocket.CloseError.
|
||||
if _, ok := err.(*websocket.CloseError); !ok {
|
||||
log.Println("error on ReadMessage:", err)
|
||||
return
|
||||
}
|
||||
// else is an expected Close Error
|
||||
log.Println("closed conn.ReadMessage:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}(ch)
|
||||
return ch
|
||||
}()
|
||||
return ctx
|
||||
}
|
||||
|
||||
// trace serves madmin.ServiceTraceInfo
|
||||
@@ -278,10 +237,13 @@ func (wsc *wsAdminClient) trace() {
|
||||
}()
|
||||
log.Println("trace started")
|
||||
|
||||
err := startTraceInfo(wsc.conn, wsc.client)
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
|
||||
err := startTraceInfo(ctx, wsc.conn, wsc.client)
|
||||
// Send Connection Close Message indicating the Status Code
|
||||
// see https://tools.ietf.org/html/rfc6455#page-45
|
||||
if err != nil {
|
||||
log.Println("err:", err)
|
||||
// If connection exceeded read deadline send Close
|
||||
// Message Policy Violation code since we don't want
|
||||
// to let the receiver figure out the read deadline.
|
||||
@@ -309,7 +271,9 @@ func (wsc *wsAdminClient) console() {
|
||||
}()
|
||||
log.Println("console logs started")
|
||||
|
||||
err := startConsoleLog(wsc.conn, wsc.client)
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
|
||||
err := startConsoleLog(ctx, wsc.conn, wsc.client)
|
||||
// Send Connection Close Message indicating the Status Code
|
||||
// see https://tools.ietf.org/html/rfc6455#page-45
|
||||
if err != nil {
|
||||
@@ -338,7 +302,9 @@ func (wsc *wsS3Client) watch(params watchOptions) {
|
||||
}()
|
||||
log.Println("watch started")
|
||||
|
||||
err := startWatch(wsc.conn, wsc.client, params)
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
|
||||
err := startWatch(ctx, wsc.conn, wsc.client, params)
|
||||
// Send Connection Close Message indicating the Status Code
|
||||
// see https://tools.ietf.org/html/rfc6455#page-45
|
||||
if err != nil {
|
||||
|
||||
@@ -17,7 +17,10 @@
|
||||
package restapi
|
||||
|
||||
import (
|
||||
"time"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// Common mocks for WSConn interface
|
||||
@@ -25,12 +28,6 @@ import (
|
||||
var connWriteMessageMock func(messageType int, data []byte) error
|
||||
var connReadMessageMock func() (messageType int, p []byte, err error)
|
||||
|
||||
// Uncomment and implement if needed
|
||||
// var connCloseMock func() error
|
||||
// var connSetReadLimitMock func(limit int64)
|
||||
// var connSetReadDeadlineMock func(t time.Time) error
|
||||
// var connSetPongHandlerMock func(h func(appData string) error)
|
||||
|
||||
// The Conn type represents a WebSocket connection.
|
||||
type mockConn struct{}
|
||||
|
||||
@@ -44,10 +41,33 @@ func (c mockConn) readMessage() (messageType int, p []byte, err error) {
|
||||
func (c mockConn) close() error {
|
||||
return nil
|
||||
}
|
||||
func (c mockConn) setReadLimit(limit int64) {
|
||||
}
|
||||
func (c mockConn) setReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
func (c mockConn) setPongHandler(h func(appData string) error) {
|
||||
|
||||
func TestWSHandle(t *testing.T) {
|
||||
// assert := assert.New(t)
|
||||
mockWSConn := mockConn{}
|
||||
|
||||
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
|
||||
}
|
||||
ctx := wsReadClientCtx(mockWSConn)
|
||||
|
||||
<-ctx.Done()
|
||||
// closed ctx correctly
|
||||
|
||||
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, errors.New("error")
|
||||
}
|
||||
ctx2 := wsReadClientCtx(mockWSConn)
|
||||
<-ctx2.Done()
|
||||
// closed ctx correctly
|
||||
|
||||
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
|
||||
connReadMessageMock = func() (messageType int, p []byte, err error) {
|
||||
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
|
||||
}
|
||||
ctx3 := wsReadClientCtx(mockWSConn)
|
||||
<-ctx3.Done()
|
||||
// closed ctx correctly
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user