Add Watch api and UI integration (#120)

Uses a similar approach as Trace and Console Logs by using
websockets. It also includes the integration with the UI which
needs 3 input fields that are sent as query parameters.
This commit is contained in:
César Nieto
2020-05-15 14:24:29 -07:00
committed by GitHub
parent acf480fd25
commit 6fef30f29d
23 changed files with 1169 additions and 236 deletions

View File

@@ -39,7 +39,6 @@ func TestAdminConsoleLog(t *testing.T) {
assert := assert.New(t)
adminClient := adminClientMock{}
mockWSConn := mockConn{}
wsClientMock := wsClientMock{madmin: adminClient}
function := "startConsoleLog()"
testReceiver := make(chan madmin.LogInfo, 5)
@@ -83,7 +82,7 @@ func TestAdminConsoleLog(t *testing.T) {
writesCount++
return nil
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); err != nil {
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// check that the TestReceiver got the same number of data from Console.
@@ -95,7 +94,7 @@ func TestAdminConsoleLog(t *testing.T) {
connWriteMessageMock = func(messageType int, data []byte) error {
return fmt.Errorf("error on write")
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on write", err.Error())
}
@@ -107,7 +106,7 @@ func TestAdminConsoleLog(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
}
@@ -116,7 +115,7 @@ func TestAdminConsoleLog(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); err != nil {
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
@@ -125,7 +124,7 @@ func TestAdminConsoleLog(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); err != nil {
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
@@ -134,7 +133,7 @@ func TestAdminConsoleLog(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, fmt.Errorf("error on read")
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on read", err.Error())
}
@@ -162,7 +161,7 @@ func TestAdminConsoleLog(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, nil
}
if err := startConsoleLog(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on Console", err.Error())
}
}

View File

@@ -40,7 +40,6 @@ func TestAdminTrace(t *testing.T) {
assert := assert.New(t)
adminClient := adminClientMock{}
mockWSConn := mockConn{}
wsClientMock := wsClientMock{madmin: adminClient}
function := "startTraceInfo()"
testReceiver := make(chan shortTraceMsg, 5)
@@ -84,7 +83,7 @@ func TestAdminTrace(t *testing.T) {
writesCount++
return nil
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); err != nil {
if err := startTraceInfo(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// check that the TestReceiver got the same number of data from trace.
@@ -96,7 +95,7 @@ func TestAdminTrace(t *testing.T) {
connWriteMessageMock = func(messageType int, data []byte) error {
return fmt.Errorf("error on write")
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on write", err.Error())
}
@@ -108,7 +107,7 @@ func TestAdminTrace(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
}
@@ -117,7 +116,7 @@ func TestAdminTrace(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); err != nil {
if err := startTraceInfo(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
@@ -126,7 +125,7 @@ func TestAdminTrace(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); err != nil {
if err := startTraceInfo(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
@@ -135,7 +134,7 @@ func TestAdminTrace(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, fmt.Errorf("error on read")
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on read", err.Error())
}
@@ -163,7 +162,7 @@ func TestAdminTrace(t *testing.T) {
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, nil
}
if err := startTraceInfo(mockWSConn, wsClientMock.madmin); assert.Error(err) {
if err := startTraceInfo(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on trace", err.Error())
}
}

View File

@@ -19,6 +19,7 @@ package restapi
import (
"context"
"fmt"
"strings"
"errors"
@@ -93,6 +94,7 @@ func (c minioClient) getBucketPolicy(bucketName string) (string, error) {
type MCS3Client interface {
addNotificationConfig(arn string, events []string, prefix, suffix string, ignoreExisting bool) *probe.Error
removeNotificationConfig(arn string, event string, prefix string, suffix string) *probe.Error
watch(options mc.WatchOptions) (*mc.WatchObject, *probe.Error)
}
// Interface implementation
@@ -113,6 +115,10 @@ func (c mcS3Client) removeNotificationConfig(arn string, event string, prefix st
return c.client.RemoveNotificationConfig(arn, event, prefix, suffix)
}
func (c mcS3Client) watch(options mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
return c.client.Watch(options)
}
// MCSCredentials interface with all functions to be implemented
// by mock when testing, it should include all needed minioCredentials.Credentials api calls
// that are used within this project.
@@ -224,19 +230,23 @@ func newMinioClient(jwt string) (*minio.Client, error) {
}
// newS3BucketClient creates a new mc S3Client to talk to the server based on a bucket
func newS3BucketClient(bucketName *string) (*mc.S3Client, error) {
func newS3BucketClient(jwt string, bucketName string) (*mc.S3Client, error) {
endpoint := getMinIOServer()
accessKeyID := getAccessKey()
secretAccessKey := getSecretKey()
useSSL := getMinIOEndpointIsSecure()
if bucketName != nil {
endpoint += fmt.Sprintf("/%s", *bucketName)
}
s3Config := newS3Config(endpoint, accessKeyID, secretAccessKey, !useSSL)
client, err := mc.S3New(s3Config)
claims, err := auth.JWTAuthenticate(jwt)
if err != nil {
return nil, err.Cause
return nil, err
}
if strings.TrimSpace(bucketName) != "" {
endpoint += fmt.Sprintf("/%s", bucketName)
}
s3Config := newS3Config(endpoint, claims.AccessKeyID, claims.SecretAccessKey, claims.SessionToken, !useSSL)
client, pErr := mc.S3New(s3Config)
if pErr != nil {
return nil, pErr.Cause
}
s3Client, ok := client.(*mc.S3Client)
if !ok {
@@ -248,7 +258,7 @@ func newS3BucketClient(bucketName *string) (*mc.S3Client, error) {
// newS3Config simply creates a new Config struct using the passed
// parameters.
func newS3Config(endpoint, accessKey, secretKey string, isSecure bool) *mc.Config {
func newS3Config(endpoint, accessKey, secretKey, sessionToken string, isSecure bool) *mc.Config {
// We have a valid alias and hostConfig. We populate the
// minioCredentials from the match found in the config file.
s3Config := new(mc.Config)
@@ -262,6 +272,7 @@ func newS3Config(endpoint, accessKey, secretKey string, isSecure bool) *mc.Confi
s3Config.HostURL = endpoint
s3Config.AccessKey = accessKey
s3Config.SecretKey = secretKey
s3Config.SessionToken = sessionToken
s3Config.Signature = "S3v4"
return s3Config
}

View File

@@ -40,14 +40,16 @@ func registerBucketEventsHandlers(api *operations.McsAPI) {
})
// create bucket event
api.UserAPICreateBucketEventHandler = user_api.CreateBucketEventHandlerFunc(func(params user_api.CreateBucketEventParams, principal *models.Principal) middleware.Responder {
if err := getCreateBucketEventsResponse(params.BucketName, params.Body); err != nil {
sessionID := string(*principal)
if err := getCreateBucketEventsResponse(sessionID, params.BucketName, params.Body); err != nil {
return user_api.NewCreateBucketEventDefault(500).WithPayload(&models.Error{Code: 500, Message: swag.String(err.Error())})
}
return user_api.NewCreateBucketEventCreated()
})
// delete bucket event
api.UserAPIDeleteBucketEventHandler = user_api.DeleteBucketEventHandlerFunc(func(params user_api.DeleteBucketEventParams, principal *models.Principal) middleware.Responder {
if err := getDeleteBucketEventsResponse(params.BucketName, params.Arn, params.Body.Events, params.Body.Prefix, params.Body.Suffix); err != nil {
sessionID := string(*principal)
if err := getDeleteBucketEventsResponse(sessionID, params.BucketName, params.Arn, params.Body.Events, params.Body.Prefix, params.Body.Suffix); err != nil {
return user_api.NewDeleteBucketEventDefault(500).WithPayload(&models.Error{Code: 500, Message: swag.String(err.Error())})
}
return user_api.NewDeleteBucketEventNoContent()
@@ -178,8 +180,8 @@ func createBucketEvent(client MCS3Client, arn string, notificationEvents []model
}
// getCreateBucketEventsResponse calls createBucketEvent to add a bucket event notification
func getCreateBucketEventsResponse(bucketName string, eventReq *models.BucketEventRequest) error {
s3Client, err := newS3BucketClient(swag.String(bucketName))
func getCreateBucketEventsResponse(sessionID, bucketName string, eventReq *models.BucketEventRequest) error {
s3Client, err := newS3BucketClient(sessionID, bucketName)
if err != nil {
log.Println("error creating S3Client:", err)
return err
@@ -214,8 +216,8 @@ func joinNotificationEvents(events []models.NotificationEventType) string {
}
// getDeleteBucketEventsResponse calls deleteBucketEventNotification() to delete a bucket event notification
func getDeleteBucketEventsResponse(bucketName string, arn string, events []models.NotificationEventType, prefix, suffix *string) error {
s3Client, err := newS3BucketClient(swag.String(bucketName))
func getDeleteBucketEventsResponse(sessionID, bucketName string, arn string, events []models.NotificationEventType, prefix, suffix *string) error {
s3Client, err := newS3BucketClient(sessionID, bucketName)
if err != nil {
log.Println("error creating S3Client:", err)
return err

165
restapi/user_watch.go Normal file
View File

@@ -0,0 +1,165 @@
// This file is part of MinIO Console Server
// Copyright (c) 2020 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package restapi
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"regexp"
"strings"
"sync"
"github.com/gorilla/websocket"
mc "github.com/minio/mc/cmd"
)
type watchOptions struct {
BucketName string
mc.WatchOptions
}
// startWatch starts by setting a websocket reader that
// will check for a heartbeat.
//
// A WaitGroup is used to handle goroutines and to ensure
// all finish in the proper order. If any, sendWatchInfo()
// or wsReadCheck() returns, watch should end.
func startWatch(conn WSConn, client MCS3Client, options watchOptions) (mError error) {
// a WaitGroup waits for a collection of goroutines to finish
wg := sync.WaitGroup{}
// a cancel context is needed to end all goroutines used
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set number of goroutines to wait. wg.Wait()
// waits until counter is zero (all are done)
wg.Add(3)
// start go routine for reading websocket heartbeat
readErr := wsReadCheck(ctx, &wg, conn)
// send Stream of watch events to the ws c.connection
ch := sendWatchInfo(ctx, &wg, conn, client, options)
// If wsReadCheck returns it means that it is not possible to check
// ws heartbeat anymore so we stop from doing Watch, cancel context
// for all goroutines.
go func(wg *sync.WaitGroup) {
defer wg.Done()
if err := <-readErr; err != nil {
log.Println("error on wsReadCheck:", err)
mError = err
}
// cancel context for all goroutines.
cancel()
}(&wg)
if err := <-ch; err != nil {
mError = err
}
// if ch closes for any reason,
// cancel context for all goroutines
cancel()
// wait all goroutines to finish
wg.Wait()
return mError
}
// sendWatchInfo sends stream of Watch Event to the ws connection
func sendWatchInfo(ctx context.Context, wg *sync.WaitGroup, conn WSConn, wsc MCS3Client, options watchOptions) <-chan error {
// decrements the WaitGroup counter
// by one when the function returns
defer wg.Done()
ch := make(chan error)
go func(ch chan<- error) {
defer close(ch)
wo, pErr := wsc.watch(options.WatchOptions)
if pErr != nil {
fmt.Println("error initializing watch:", pErr.Cause)
ch <- pErr.Cause
return
}
for {
select {
case <-ctx.Done():
close(wo.DoneChan)
return
case events, ok := <-wo.Events():
// zero value returned because the channel is closed and empty
if !ok {
return
}
for _, event := range events {
// Serialize message to be sent
bytes, err := json.Marshal(event)
if err != nil {
fmt.Println("error on json.Marshal:", err)
ch <- err
return
}
// Send Message through websocket connection
err = conn.writeMessage(websocket.TextMessage, bytes)
if err != nil {
log.Println("error writeMessage:", err)
ch <- err
return
}
}
case pErr, ok := <-wo.Errors():
// zero value returned because the channel is closed and empty
if !ok {
return
}
if pErr != nil {
log.Println("error on watch:", pErr.Cause)
ch <- pErr.Cause
return
}
}
}
}(ch)
return ch
}
// getOptionsFromReq gets bucket name, events, prefix, suffix from a websocket
// watch path if defined.
// path come as : `/watch/bucket1` and query params come on request form
func getOptionsFromReq(req *http.Request) watchOptions {
wOptions := watchOptions{}
// Default Events if not defined
wOptions.Events = []string{"put", "get", "delete"}
re := regexp.MustCompile(`(/watch/)(.*?$)`)
matches := re.FindAllSubmatch([]byte(req.URL.Path), -1)
// len matches is always 3
// matches comes as e.g.
// [["...", "/watch/" "bucket1"]]
// [["/watch/" "/watch/" ""]]
// bucket name is on the second group, third position
wOptions.BucketName = strings.TrimSpace(string(matches[0][2]))
events := req.FormValue("events")
if strings.TrimSpace(events) != "" {
wOptions.Events = strings.Split(events, ",")
}
wOptions.Prefix = req.FormValue("prefix")
wOptions.Suffix = req.FormValue("suffix")
return wOptions
}

291
restapi/user_watch_test.go Normal file
View File

@@ -0,0 +1,291 @@
// This file is part of MinIO Console Server
// Copyright (c) 2020 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package restapi
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"testing"
"github.com/gorilla/websocket"
mc "github.com/minio/mc/cmd"
"github.com/minio/mc/pkg/probe"
"github.com/stretchr/testify/assert"
)
// assigning mock at runtime instead of compile time
var mcWatchMock func(options mc.WatchOptions) (*mc.WatchObject, *probe.Error)
// implements mc.S3Client.Watch()
func (c s3ClientMock) watch(options mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
return mcWatchMock(options)
}
func TestWatch(t *testing.T) {
assert := assert.New(t)
client := s3ClientMock{}
mockWSConn := mockConn{}
function := "startWatch()"
testStreamSize := 5
testReceiver := make(chan []mc.EventInfo, testStreamSize)
textToReceive := "test message"
testOptions := watchOptions{}
testOptions.BucketName = "bucktest"
testOptions.Prefix = "file/"
testOptions.Suffix = ".png"
// Test-1: Serve Watch with no errors until Watch finishes sending
// define mock function behavior
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
wo := &mc.WatchObject{
EventInfoChan: make(chan []mc.EventInfo),
ErrorChan: make(chan *probe.Error),
DoneChan: make(chan struct{}),
}
// Only success, start a routine to start reading line by line.
go func(wo *mc.WatchObject) {
defer wo.Close()
lines := make([]int, testStreamSize)
// mocking sending 5 lines of info
for range lines {
info := []mc.EventInfo{
mc.EventInfo{
UserAgent: textToReceive,
},
}
wo.Events() <- info
}
}(wo)
return wo, nil
}
// mock function of conn.ReadMessage(), no error on read
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, nil
}
writesCount := 1
// mock connection WriteMessage() no error
connWriteMessageMock = func(messageType int, data []byte) error {
// emulate that receiver gets the message written
var t []mc.EventInfo
_ = json.Unmarshal(data, &t)
if writesCount == testStreamSize {
// for testing we need to close the receiver channel
close(testReceiver)
return nil
}
testReceiver <- t
writesCount++
return nil
}
if err := startWatch(mockWSConn, client, testOptions); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// check that the TestReceiver got the same number of data from Console.
for i := range testReceiver {
for _, val := range i {
assert.Equal(textToReceive, val.UserAgent)
}
}
// Test-2: if error happens while writing, return error
connWriteMessageMock = func(messageType int, data []byte) error {
return fmt.Errorf("error on write")
}
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
assert.Equal("error on write", err.Error())
}
// Test-3: error happens while reading, unexpected Close Error should return error.
connWriteMessageMock = func(messageType int, data []byte) error {
return nil
}
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
}
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
}
// Test-4: error happens while reading, expected Close Error NormalClosure
// expected Close Error should not return an error, just end Console.
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
}
if err := startWatch(mockWSConn, client, testOptions); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// Test-5: error happens while reading, expected Close Error CloseGoingAway
// expected Close Error should not return an error, just return.
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
}
if err := startWatch(mockWSConn, client, testOptions); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// Test-6: error happens while reading, non Close Error Type should be returned as
// error
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, fmt.Errorf("error on read")
}
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
assert.Equal("error on read", err.Error())
}
// Test-7: error happens on Watch, watch should stop
// and error shall be returned.
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
wo := &mc.WatchObject{
EventInfoChan: make(chan []mc.EventInfo),
ErrorChan: make(chan *probe.Error),
DoneChan: make(chan struct{}),
}
// Only success, start a routine to start reading line by line.
go func(wo *mc.WatchObject) {
defer wo.Close()
lines := make([]int, testStreamSize)
// mocking sending 5 lines of info
for range lines {
info := []mc.EventInfo{
mc.EventInfo{
UserAgent: textToReceive,
},
}
wo.Events() <- info
}
wo.Errors() <- &probe.Error{Cause: fmt.Errorf("error on Watch")}
}(wo)
return wo, nil
}
// mock function of conn.ReadMessage(), no error on read, should stay unless
// context is done.
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, nil
}
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
assert.Equal("error on Watch", err.Error())
}
// Test-8: error happens on Watch, watch should stop
// and error shall be returned.
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
return nil, &probe.Error{Cause: fmt.Errorf("error on Watch")}
}
if err := startWatch(mockWSConn, client, testOptions); assert.Error(err) {
assert.Equal("error on Watch", err.Error())
}
// Test-9: return nil on error on Watch
mcWatchMock = func(params mc.WatchOptions) (*mc.WatchObject, *probe.Error) {
wo := &mc.WatchObject{
EventInfoChan: make(chan []mc.EventInfo),
ErrorChan: make(chan *probe.Error),
DoneChan: make(chan struct{}),
}
// Only success, start a routine to start reading line by line.
go func(wo *mc.WatchObject) {
defer wo.Close()
lines := make([]int, testStreamSize)
// mocking sending 5 lines of info
for range lines {
info := []mc.EventInfo{
mc.EventInfo{
UserAgent: textToReceive,
},
}
wo.Events() <- info
}
wo.Events() <- nil
wo.Errors() <- nil
}(wo)
return wo, nil
}
if err := startWatch(mockWSConn, client, testOptions); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// check that the TestReceiver got the same number of data from Console.
for i := range testReceiver {
for _, val := range i {
assert.Equal(textToReceive, val.UserAgent)
}
}
// Test-9: getOptionsFromReq return parameters from path
u, err := url.Parse("http://localhost/api/v1/watch/bucket1?prefix=&suffix=.jpg&events=put,get")
if err != nil {
t.Errorf("Failed on %s:, error occurred: %s", "url.Parse()", err.Error())
}
req := &http.Request{
URL: u,
}
opts := getOptionsFromReq(req)
expectedOptions := watchOptions{
BucketName: "bucket1",
}
expectedOptions.Prefix = ""
expectedOptions.Suffix = ".jpg"
expectedOptions.Events = []string{"put", "get"}
assert.Equal(expectedOptions.BucketName, opts.BucketName)
assert.Equal(expectedOptions.Prefix, opts.Prefix)
assert.Equal(expectedOptions.Suffix, opts.Suffix)
assert.Equal(expectedOptions.Events, opts.Events)
// Test-9: getOptionsFromReq return default events if not defined
u, err = url.Parse("http://localhost/api/v1/watch/bucket1?prefix=&suffix=.jpg&events=")
if err != nil {
t.Errorf("Failed on %s:, error occurred: %s", "url.Parse()", err.Error())
}
req = &http.Request{
URL: u,
}
opts = getOptionsFromReq(req)
expectedOptions = watchOptions{
BucketName: "bucket1",
}
expectedOptions.Prefix = ""
expectedOptions.Suffix = ".jpg"
expectedOptions.Events = []string{"put", "get", "delete"}
assert.Equal(expectedOptions.BucketName, opts.BucketName)
assert.Equal(expectedOptions.Prefix, opts.Prefix)
assert.Equal(expectedOptions.Suffix, opts.Suffix)
assert.Equal(expectedOptions.Events, opts.Events)
// Test-10: getOptionsFromReq return default events if not defined
u, err = url.Parse("http://localhost/api/v1/watch/bucket2?prefix=&suffix=")
if err != nil {
t.Errorf("Failed on %s:, error occurred: %s", "url.Parse()", err.Error())
}
req = &http.Request{
URL: u,
}
opts = getOptionsFromReq(req)
expectedOptions = watchOptions{
BucketName: "bucket2",
}
expectedOptions.Events = []string{"put", "get", "delete"}
assert.Equal(expectedOptions.BucketName, opts.BucketName)
assert.Equal(expectedOptions.Prefix, opts.Prefix)
assert.Equal(expectedOptions.Suffix, opts.Suffix)
assert.Equal(expectedOptions.Events, opts.Events)
}

View File

@@ -27,6 +27,7 @@ import (
"github.com/go-openapi/errors"
"github.com/gorilla/websocket"
"github.com/minio/mcs/pkg/auth"
"github.com/minio/mcs/pkg/ws"
)
@@ -46,17 +47,29 @@ const (
maxMessageSize = 512
)
// MCSWebsocket interface of a Websocket Client
type MCSWebsocket interface {
// start trace info from servers
// MCSWebsocketAdmin interface of a Websocket Client
type MCSWebsocketAdmin interface {
trace()
console()
}
type wsClient struct {
type wsAdminClient struct {
// websocket connection.
conn wsConn
// MinIO admin Client
madmin MinioAdmin
client MinioAdmin
}
// MCSWebsocket interface of a Websocket Client
type MCSWebsocket interface {
watch(options watchOptions)
}
type wsS3Client struct {
// websocket connection.
conn wsConn
// mcS3Client
client MCS3Client
}
// WSConn interface with all functions to be implemented
@@ -106,14 +119,18 @@ func (c wsConn) readMessage() (messageType int, p []byte, err error) {
// Websocket communication will be done depending
// on the path.
// Request should come like ws://<host>:<port>/ws/<api>
//
// TODO: Enable CORS
func serveWS(w http.ResponseWriter, req *http.Request) {
sessionID, err := ws.GetTokenFromRequest(req)
if err != nil {
errors.ServeError(w, req, err)
return
}
// Perform authentication before upgrading to a Websocket Connection
// authenticate WS connection with MCS
claims, err := ws.Authenticate(req)
claims, err := auth.JWTAuthenticate(*sessionID)
if err != nil {
log.Print("error on ws authentication: ", err)
errors.ServeError(w, req, err)
errors.ServeError(w, req, errors.New(http.StatusUnauthorized, err.Error()))
return
}
@@ -125,28 +142,30 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
return
}
// Only start Websocket Interaction after user has been
// authenticated with MinIO
mAdmin, err := newAdminFromClaims(claims)
if err != nil {
log.Println("error creating Madmin Client:", err)
errors.ServeError(w, req, err)
return
}
// create a minioClient interface implementation
// defining the client to be used
adminClient := adminClient{client: mAdmin}
// create a websocket connection interface implementation
// defining the connection to be used
wsConnection := wsConn{conn: conn}
// create websocket client and handle request
wsClient := &wsClient{conn: wsConnection, madmin: adminClient}
switch strings.TrimPrefix(req.URL.Path, wsBasePath) {
case "/trace":
go wsClient.trace()
case "/console":
go wsClient.console()
wsPath := strings.TrimPrefix(req.URL.Path, wsBasePath)
switch {
case wsPath == "/trace":
wsAdminClient, err := newWebSocketAdminClient(conn, claims)
if err != nil {
errors.ServeError(w, req, err)
return
}
go wsAdminClient.trace()
case wsPath == "/console":
wsAdminClient, err := newWebSocketAdminClient(conn, claims)
if err != nil {
errors.ServeError(w, req, err)
return
}
go wsAdminClient.console()
case strings.HasPrefix(wsPath, `/watch`):
wOptions := getOptionsFromReq(req)
wsS3Client, err := newWebSocketS3Client(conn, *sessionID, wOptions.BucketName)
if err != nil {
errors.ServeError(w, req, err)
return
}
go wsS3Client.watch(wOptions)
default:
// path not found
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
@@ -154,6 +173,51 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
}
}
// newWebSocketAdminClient returns a wsAdminClient authenticated as an admin user
func newWebSocketAdminClient(conn *websocket.Conn, autClaims *auth.DecryptedClaims) (*wsAdminClient, error) {
// Only start Websocket Interaction after user has been
// authenticated with MinIO
mAdmin, err := newAdminFromClaims(autClaims)
if err != nil {
log.Println("error creating Madmin Client:", err)
// close connection
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
conn.Close()
return nil, err
}
// create a websocket connection interface implementation
// defining the connection to be used
wsConnection := wsConn{conn: conn}
// create a minioClient interface implementation
// defining the client to be used
adminClient := adminClient{client: mAdmin}
// create websocket client and handle request
wsAdminClient := &wsAdminClient{conn: wsConnection, client: adminClient}
return wsAdminClient, nil
}
// newWebSocketS3Client returns a wsAdminClient authenticated as MCS admin
func newWebSocketS3Client(conn *websocket.Conn, jwt, bucketName string) (*wsS3Client, error) {
// Only start Websocket Interaction after user has been
// authenticated with MinIO
s3Client, err := newS3BucketClient(jwt, bucketName)
if err != nil {
log.Println("error creating S3Client:", err)
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
conn.Close()
return nil, err
}
// create a websocket connection interface implementation
// defining the connection to be used
wsConnection := wsConn{conn: conn}
// create a s3Client interface implementation
// defining the client to be used
mcS3C := mcS3Client{client: s3Client}
// create websocket client and handle request
wsS3Client := &wsS3Client{conn: wsConnection, client: mcS3C}
return wsS3Client, nil
}
// wsReadCheck ensures that the client is sending a message
// every `pingWait` seconds. If deadline exceeded or an error
// happened this will return, meaning it won't be able to ensure
@@ -206,7 +270,7 @@ func wsReadCheck(ctx context.Context, wg *sync.WaitGroup, conn WSConn) chan erro
// trace serves madmin.ServiceTraceInfo
// on a Websocket connection.
func (wsc *wsClient) trace() {
func (wsc *wsAdminClient) trace() {
defer func() {
log.Println("trace stopped")
// close connection after return
@@ -214,7 +278,7 @@ func (wsc *wsClient) trace() {
}()
log.Println("trace started")
err := startTraceInfo(wsc.conn, wsc.madmin)
err := startTraceInfo(wsc.conn, wsc.client)
// Send Connection Close Message indicating the Status Code
// see https://tools.ietf.org/html/rfc6455#page-45
if err != nil {
@@ -237,7 +301,7 @@ func (wsc *wsClient) trace() {
// console serves madmin.GetLogs
// on a Websocket connection.
func (wsc *wsClient) console() {
func (wsc *wsAdminClient) console() {
defer func() {
log.Println("console logs stopped")
// close connection after return
@@ -245,7 +309,36 @@ func (wsc *wsClient) console() {
}()
log.Println("console logs started")
err := startConsoleLog(wsc.conn, wsc.madmin)
err := startConsoleLog(wsc.conn, wsc.client)
// Send Connection Close Message indicating the Status Code
// see https://tools.ietf.org/html/rfc6455#page-45
if err != nil {
// If connection exceeded read deadline send Close
// Message Policy Violation code since we don't want
// to let the receiver figure out the read deadline.
// This is a generic code designed if there is a
// need to hide specific details about the policy.
if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
wsc.conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, ""))
return
}
// else, internal server error
wsc.conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()))
return
}
// normal closure
wsc.conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
}
func (wsc *wsS3Client) watch(params watchOptions) {
defer func() {
log.Println("watch stopped")
// close connection after return
wsc.conn.close()
}()
log.Println("watch started")
err := startWatch(wsc.conn, wsc.client, params)
// Send Connection Close Message indicating the Status Code
// see https://tools.ietf.org/html/rfc6455#page-45
if err != nil {

View File

@@ -51,18 +51,3 @@ func (c mockConn) setReadDeadline(t time.Time) error {
}
func (c mockConn) setPongHandler(h func(appData string) error) {
}
// Common mocks for MCSWebsocket interface
// assigning mock at runtime instead of compile time
var wsTraceMock func()
// Define a mock struct of wsClient interface implementation
type wsClientMock struct {
// MinIO admin Client
madmin MinioAdmin
}
// mock function of wsc.trace()
func (wsc wsClientMock) trace() {
wsTraceMock()
}