Add get healthInfo api using websockets (#543)

Integrate also HealthInfo API with Console UI
This commit is contained in:
Cesar N
2021-01-13 14:43:34 -06:00
committed by GitHub
parent 1c109769df
commit d6aceb5430
32 changed files with 1814 additions and 399 deletions

View File

@@ -0,0 +1,94 @@
// This file is part of MinIO Console Server
// Copyright (c) 2021 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package restapi
import (
"context"
"encoding/json"
"log"
"net/http"
"time"
"errors"
"github.com/gorilla/websocket"
madmin "github.com/minio/minio/pkg/madmin"
)
// startHealthInfo starts fetching mc.ServerHealthInfo and
// sends messages with the corresponding data on the websocket connection
func startHealthInfo(ctx context.Context, conn WSConn, client MinioAdmin, deadline *time.Duration) error {
if deadline == nil {
return errors.New("duration can't be nil on startHealthInfo")
}
// Fetch info of all servers (cluster or single server)
healthDataTypes := []madmin.HealthDataType{
madmin.HealthDataTypePerfDrive,
madmin.HealthDataTypePerfNet,
madmin.HealthDataTypeMinioInfo,
madmin.HealthDataTypeMinioConfig,
madmin.HealthDataTypeSysCPU,
madmin.HealthDataTypeSysDiskHw,
madmin.HealthDataTypeSysDocker,
madmin.HealthDataTypeSysOsInfo,
madmin.HealthDataTypeSysLoad,
madmin.HealthDataTypeSysMem,
madmin.HealthDataTypeSysNet,
madmin.HealthDataTypeSysProcess,
}
healthChan := client.serverHealthInfo(ctx, healthDataTypes, *deadline)
// wait for events to occur
for {
select {
// return if context ends
case <-ctx.Done():
return nil
case adminHealthInfo, ok := <-healthChan:
// zero value returned because the channel is closed and empty
if !ok {
return nil
}
if adminHealthInfo.Error != "" {
return errors.New(adminHealthInfo.Error)
}
// Serialize message to be sent
bytes, err := json.Marshal(adminHealthInfo)
if err != nil {
log.Println("error on json.Marshal:", err)
return err
}
// Send Message through websocket connection
err = conn.writeMessage(websocket.TextMessage, bytes)
if err != nil {
log.Println("error writeMessage:", err)
return err
}
}
}
}
// getHealthInfoOptionsFromReq gets duration for startHealthInfo request
// path come as : `/health-info?deadline=2h`
func getHealthInfoOptionsFromReq(req *http.Request) (*time.Duration, error) {
deadlineDuration, err := time.ParseDuration(req.FormValue("deadline"))
if err != nil {
return nil, err
}
return &deadlineDuration, nil
}

View File

@@ -0,0 +1,192 @@
// This file is part of MinIO Console Server
// Copyright (c) 2021 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package restapi
import (
"context"
"encoding/json"
"errors"
"reflect"
"testing"
"time"
madmin "github.com/minio/minio/pkg/madmin"
)
// assigning mock at runtime instead of compile time
var minioServerHealthInfoMock func(ctx context.Context, healthDataTypes []madmin.HealthDataType, deadline time.Duration) <-chan madmin.HealthInfo
// mock function serverHealthInfo
func (ac adminClientMock) serverHealthInfo(ctx context.Context, healthDataTypes []madmin.HealthDataType, deadline time.Duration) <-chan madmin.HealthInfo {
return minioServerHealthInfoMock(ctx, healthDataTypes, deadline)
}
func Test_serverHealthInfo(t *testing.T) {
var testReceiver chan madmin.HealthInfo
ctx := context.Background()
client := adminClientMock{}
mockWSConn := mockConn{}
deadlineDuration, _ := time.ParseDuration("1h")
type args struct {
deadline time.Duration
wsWriteMock func(messageType int, data []byte) error
mockMessages []madmin.HealthInfo
}
tests := []struct {
test string
args args
wantError error
}{
{
test: "Return simple health info, no errors",
args: args{
deadline: deadlineDuration,
mockMessages: []madmin.HealthInfo{
madmin.HealthInfo{
Perf: madmin.PerfInfo{
NetParallel: madmin.ServerNetHealthInfo{
Addr: "someaddress",
},
},
},
madmin.HealthInfo{
Perf: madmin.PerfInfo{
NetParallel: madmin.ServerNetHealthInfo{
Addr: "otheraddress",
},
},
},
},
wsWriteMock: func(messageType int, data []byte) error {
// mock connection WriteMessage() no error
// emulate that receiver gets the message written
var t madmin.HealthInfo
_ = json.Unmarshal(data, &t)
testReceiver <- t
return nil
},
},
wantError: nil,
},
{
test: "Return simple health info2, no errors",
args: args{
deadline: deadlineDuration,
mockMessages: []madmin.HealthInfo{
madmin.HealthInfo{
Perf: madmin.PerfInfo{
NetParallel: madmin.ServerNetHealthInfo{
Addr: "address",
},
},
},
},
wsWriteMock: func(messageType int, data []byte) error {
// mock connection WriteMessage() no error
// emulate that receiver gets the message written
var t madmin.HealthInfo
_ = json.Unmarshal(data, &t)
testReceiver <- t
return nil
},
},
wantError: nil,
},
{
test: "Handle error on ws write",
args: args{
deadline: deadlineDuration,
mockMessages: []madmin.HealthInfo{
madmin.HealthInfo{
Perf: madmin.PerfInfo{
NetParallel: madmin.ServerNetHealthInfo{
Addr: "address",
},
},
},
},
wsWriteMock: func(messageType int, data []byte) error {
// mock connection WriteMessage() no error
// emulate that receiver gets the message written
var t madmin.HealthInfo
_ = json.Unmarshal(data, &t)
return errors.New("error on write")
},
},
wantError: errors.New("error on write"),
},
{
test: "Handle error on health function",
args: args{
deadline: deadlineDuration,
mockMessages: []madmin.HealthInfo{
madmin.HealthInfo{
Error: "error on healthInfo",
},
},
wsWriteMock: func(messageType int, data []byte) error {
// mock connection WriteMessage() no error
// emulate that receiver gets the message written
var t madmin.HealthInfo
_ = json.Unmarshal(data, &t)
return nil
},
},
wantError: errors.New("error on healthInfo"),
},
}
for _, tt := range tests {
t.Run(tt.test, func(t *testing.T) {
// make testReceiver channel
testReceiver = make(chan madmin.HealthInfo, len(tt.args.mockMessages))
// mock function same for all tests, changes mockMessages
minioServerHealthInfoMock = func(ctx context.Context, healthDataTypes []madmin.HealthDataType, deadline time.Duration) <-chan madmin.HealthInfo {
respChan := make(chan madmin.HealthInfo)
go func(ch chan madmin.HealthInfo) {
defer close(ch)
for _, info := range tt.args.mockMessages {
ch <- info
}
}(respChan)
return respChan
}
connWriteMessageMock = tt.args.wsWriteMock
err := startHealthInfo(ctx, mockWSConn, client, &deadlineDuration)
// close test mock channel
close(testReceiver)
// check that the TestReceiver got the same number of data from Console.
index := 0
for info := range testReceiver {
if !reflect.DeepEqual(info, tt.args.mockMessages[index]) {
t.Errorf("startHealthInfo() got: %v, want: %v", info, tt.args.mockMessages[index])
return
}
index++
}
if !reflect.DeepEqual(err, tt.wantError) {
t.Errorf("startHealthInfo() error: %v, wantError: %v", err, tt.wantError)
return
}
})
}
}

View File

@@ -22,6 +22,7 @@ import (
"net/http"
"path/filepath"
"runtime"
"time"
"github.com/minio/console/models"
mcCmd "github.com/minio/mc/cmd"
@@ -105,6 +106,8 @@ type MinioAdmin interface {
addRemoteBucket(ctx context.Context, bucket string, target *madmin.BucketTarget) (string, error)
// Account password management
changePassword(ctx context.Context, accessKey, secretKey string) error
serverHealthInfo(ctx context.Context, healthDataTypes []madmin.HealthDataType, deadline time.Duration) <-chan madmin.HealthInfo
}
// Interface implementation
@@ -286,11 +289,15 @@ func (ac adminClient) addRemoteBucket(ctx context.Context, bucket string, target
return ac.client.SetRemoteTarget(ctx, bucket, target)
}
// addRemoteBucket sets up a remote target for this bucket
func (ac adminClient) setBucketQuota(ctx context.Context, bucket string, quota *madmin.BucketQuota) error {
return ac.client.SetBucketQuota(ctx, bucket, quota)
}
// serverHealthInfo implements mc.ServerHealthInfo - Connect to a minio server and call Health Info Management API
func (ac adminClient) serverHealthInfo(ctx context.Context, healthDataTypes []madmin.HealthDataType, deadline time.Duration) <-chan madmin.HealthInfo {
return ac.client.ServerHealthInfo(ctx, healthDataTypes, deadline)
}
func newMAdminClient(sessionClaims *models.Principal) (*madmin.AdminClient, error) {
adminClient, err := newAdminFromClaims(sessionClaims)
if err != nil {

View File

@@ -80,7 +80,7 @@ func startWatch(ctx context.Context, conn WSConn, wsc MCClient, options *watchOp
// getWatchOptionsFromReq gets bucket name, events, prefix, suffix from a websocket
// watch path if defined.
// path come as : `/watch/<namespace>/<tenantName>/bucket1` and query
// path come as : `/watch/bucket1` and query
// params come on request form
func getWatchOptionsFromReq(req *http.Request) (*watchOptions, error) {
wOptions := watchOptions{}

View File

@@ -22,6 +22,7 @@ import (
"net"
"net/http"
"strings"
"time"
"github.com/go-openapi/errors"
"github.com/gorilla/websocket"
@@ -132,6 +133,19 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
return
}
go wsAdminClient.console()
case strings.HasPrefix(wsPath, `/health-info`):
deadline, err := getHealthInfoOptionsFromReq(req)
if err != nil {
log.Println("error getting health info options:", err)
closeWsConn(conn)
return
}
wsAdminClient, err := newWebSocketAdminClient(conn, session)
if err != nil {
closeWsConn(conn)
return
}
go wsAdminClient.healthInfo(deadline)
case strings.HasPrefix(wsPath, `/heal`):
hOptions, err := getHealOptionsFromReq(req)
if err != nil {
@@ -308,10 +322,26 @@ func (wsc *wsAdminClient) heal(opts *healOptions) {
sendWsCloseMessage(wsc.conn, err)
}
func (wsc *wsAdminClient) healthInfo(deadline *time.Duration) {
defer func() {
log.Println("health info stopped")
// close connection after return
wsc.conn.close()
}()
log.Println("health info started")
ctx := wsReadClientCtx(wsc.conn)
err := startHealthInfo(ctx, wsc.conn, wsc.client, deadline)
sendWsCloseMessage(wsc.conn, err)
}
// sendWsCloseMessage sends Websocket Connection Close Message indicating the Status Code
// see https://tools.ietf.org/html/rfc6455#page-45
func sendWsCloseMessage(conn WSConn, err error) {
if err != nil {
log.Print("original ws error: ", err.Error())
// If connection exceeded read deadline send Close
// Message Policy Violation code since we don't want
// to let the receiver figure out the read deadline.
@@ -322,7 +352,7 @@ func sendWsCloseMessage(conn WSConn, err error) {
return
}
// else, internal server error
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()))
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, errorGeneric.Error()))
return
}
// normal closure