Implement WebSockets for Profile download (#2190)

This commit is contained in:
Kaan Kabalak
2022-07-26 18:40:24 -07:00
committed by GitHub
parent db07f546a4
commit 51a8bacc18
6 changed files with 155 additions and 213 deletions

View File

@@ -20,18 +20,16 @@ import (
"archive/zip"
"bytes"
"fmt"
"io"
"log"
"net/http"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
)
func TestStartProfiling(t *testing.T) {
testAsser := assert.New(t)
testAssert := assert.New(t)
tests := []struct {
name string
@@ -58,53 +56,28 @@ func TestStartProfiling(t *testing.T) {
"profile-127.0.0.1:9000-mutex-before.pprof": false,
}
client := &http.Client{
Timeout: 3 * time.Second,
}
wsDestination := "/ws/profile?types=cpu,mem,block,mutex,trace,threads,goroutines"
wsFinalURL := fmt.Sprintf("ws://localhost:9090%s", wsDestination)
destination := "/api/v1/profiling/start"
finalURL := fmt.Sprintf("http://localhost:9090%s", destination)
request, err := http.NewRequest("POST", finalURL, strings.NewReader("{\"type\":\"cpu,mem,block,mutex,trace,threads,goroutines\"}"))
ws, _, err := websocket.DefaultDialer.Dial(wsFinalURL, nil)
if err != nil {
log.Println(err)
return
}
defer ws.Close()
_, zipFileBytes, err := ws.ReadMessage()
if err != nil {
log.Println(err)
return
}
request.Header.Add("Cookie", fmt.Sprintf("token=%s", token))
request.Header.Add("Content-Type", "application/json")
response, err := client.Do(request)
testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err))
testAsser.Equal(201, response.StatusCode)
time.Sleep(5 * time.Second)
destination = "/api/v1/profiling/stop"
finalURL = fmt.Sprintf("http://localhost:9090%s", destination)
request, err = http.NewRequest("POST", finalURL, nil)
if err != nil {
log.Println(err)
return
}
request.Header.Add("Cookie", fmt.Sprintf("token=%s", token))
request.Header.Add("Content-Type", "application/json")
response, err = client.Do(request)
testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err))
testAsser.Equal(200, response.StatusCode)
zipFileBytes, err := io.ReadAll(response.Body)
if err != nil {
testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err))
}
filetype := http.DetectContentType(zipFileBytes)
testAsser.Equal("application/zip", filetype)
testAssert.Equal("application/zip", filetype)
zipReader, err := zip.NewReader(bytes.NewReader(zipFileBytes), int64(len(zipFileBytes)))
if err != nil {
testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err))
testAssert.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err))
}
// Read all the files from zip archive
@@ -113,7 +86,7 @@ func TestStartProfiling(t *testing.T) {
}
for k, v := range files {
testAsser.Equal(true, v, fmt.Sprintf("%s : compressed file expected to have %v file inside", tt.name, k))
testAssert.Equal(true, v, fmt.Sprintf("%s : compressed file expected to have %v file inside", tt.name, k))
}
})
}

View File

@@ -1,4 +1,5 @@
import React, { Fragment, useState } from "react";
import { IMessageEvent, w3cwebsocket as W3CWebSocket } from "websocket";
import { Theme } from "@mui/material/styles";
import createStyles from "@mui/styles/createStyles";
import withStyles from "@mui/styles/withStyles";
@@ -6,8 +7,7 @@ import { Button, Grid } from "@mui/material";
import PageHeader from "../Common/PageHeader/PageHeader";
import PageLayout from "../Common/Layout/PageLayout";
import CheckboxWrapper from "../Common/FormComponents/CheckboxWrapper/CheckboxWrapper";
import api from "../../../common/api";
import { ErrorResponseHandler } from "../../../common/types";
import { wsProtocol } from "../../../utils/wsUtils";
import {
actionsTray,
containerForHeader,
@@ -52,6 +52,8 @@ interface IProfileProps {
classes: any;
}
var c: any = null;
const Profile = ({ classes }: IProfileProps) => {
const [profilingStarted, setProfilingStarted] = useState<boolean>(false);
const [types, setTypes] = useState<string[]>([
@@ -85,43 +87,53 @@ const Profile = ({ classes }: IProfileProps) => {
};
const startProfiling = () => {
if (!profilingStarted) {
const typeString = types.join(",");
setProfilingStarted(true);
api
.invoke("POST", `/api/v1/profiling/start`, {
type: typeString,
})
.then(() => {})
.catch((err: ErrorResponseHandler) => {
console.log(err);
setProfilingStarted(false);
});
const typeString = types.join(",");
const url = new URL(window.location.toString());
const isDev = process.env.NODE_ENV === "development";
const port = isDev ? "9090" : url.port;
// check if we are using base path, if not this always is `/`
const baseLocation = new URL(document.baseURI);
const baseUrl = baseLocation.pathname;
const wsProt = wsProtocol(url.protocol);
c = new W3CWebSocket(
`${wsProt}://${url.hostname}:${port}${baseUrl}ws/profile?types=${typeString}`
);
if (c !== null) {
c.onopen = () => {
setProfilingStarted(true);
c.send("ok");
};
c.onmessage = (message: IMessageEvent) => {
// process received message
let response = new Blob([message.data], { type: "application/zip" });
let filename = "profile.zip";
setProfilingStarted(false);
var link = document.createElement("a");
link.href = window.URL.createObjectURL(response);
link.download = filename;
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
};
c.onclose = () => {
console.log("connection closed by server");
setProfilingStarted(false);
};
return () => {
c.close(1000);
console.log("closing websockets");
setProfilingStarted(false);
};
}
};
const stopProfiling = () => {
if (profilingStarted) {
const anchor = document.createElement("a");
document.body.appendChild(anchor);
let path = "/api/v1/profiling/stop";
var req = new XMLHttpRequest();
req.open("POST", path, true);
req.responseType = "blob";
req.onreadystatechange = () => {
if (req.readyState === 4 && req.status === 200) {
let filename = "profile.zip";
setProfilingStarted(false);
var link = document.createElement("a");
link.href = window.URL.createObjectURL(req.response);
link.download = filename;
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
}
};
req.send();
}
c.close(1000);
setProfilingStarted(false);
};
return (

View File

@@ -18,59 +18,32 @@ package restapi
import (
"context"
"io"
"io/ioutil"
"net/http"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/middleware"
"github.com/gorilla/websocket"
"github.com/minio/console/models"
"github.com/minio/console/restapi/operations"
profileApi "github.com/minio/console/restapi/operations/profile"
"github.com/minio/madmin-go"
)
func registerProfilingHandler(api *operations.ConsoleAPI) {
// Start Profiling
api.ProfileProfilingStartHandler = profileApi.ProfilingStartHandlerFunc(func(params profileApi.ProfilingStartParams, session *models.Principal) middleware.Responder {
profilingStartResponse, err := getProfilingStartResponse(session, params)
if err != nil {
return profileApi.NewProfilingStartDefault(int(err.Code)).WithPayload(err)
}
return profileApi.NewProfilingStartCreated().WithPayload(profilingStartResponse)
})
// Stop and download profiling data
api.ProfileProfilingStopHandler = profileApi.ProfilingStopHandlerFunc(func(params profileApi.ProfilingStopParams, session *models.Principal) middleware.Responder {
profilingStopResponse, err := getProfilingStopResponse(session, params)
if err != nil {
return profileApi.NewProfilingStopDefault(int(err.Code)).WithPayload(err)
}
// Custom response writer to set the content-disposition header to tell the
// HTTP client the name and extension of the file we are returning
return middleware.ResponderFunc(func(w http.ResponseWriter, _ runtime.Producer) {
defer profilingStopResponse.Close()
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", "attachment; filename=profile.zip")
io.Copy(w, profilingStopResponse)
})
})
var items []*models.StartProfilingItem
type profileOptions struct {
Types string
}
// startProfiling() starts the profiling on the Minio server
// Enable 1 of the 7 profiling mechanisms: "cpu","mem","block","mutex","trace","threads","goroutines"
// in the Minio server, returns []*models.StartProfilingItem that contains individual status of this operation
// for each Minio node, ie:
//
// {
// "Success": true,
// "nodeName": "127.0.0.1:9000"
// "errors": ""
// }
func startProfiling(ctx context.Context, client MinioAdmin, profilerType string) ([]*models.StartProfilingItem, error) {
profilingResults, err := client.startProfiling(ctx, madmin.ProfilerType(profilerType))
func getProfileOptionsFromReq(req *http.Request) (*profileOptions, error) {
pOptions := profileOptions{}
pOptions.Types = req.FormValue("types")
return &pOptions, nil
}
func startProfiling(ctx context.Context, conn WSConn, client MinioAdmin, pOpts *profileOptions) error {
profilingResults, err := client.startProfiling(ctx, madmin.ProfilerType(pOpts.Types))
if err != nil {
return nil, err
return err
}
var items []*models.StartProfilingItem
items = []*models.StartProfilingItem{}
for _, result := range profilingResults {
items = append(items, &models.StartProfilingItem{
Success: result.Success,
@@ -78,57 +51,13 @@ func startProfiling(ctx context.Context, client MinioAdmin, profilerType string)
NodeName: result.NodeName,
})
}
return items, nil
}
// getProfilingStartResponse performs startProfiling() and serializes it to the handler's output
func getProfilingStartResponse(session *models.Principal, params profileApi.ProfilingStartParams) (*models.StartProfilingList, *models.Error) {
ctx, cancel := context.WithCancel(params.HTTPRequest.Context())
defer cancel()
if params.Body == nil {
return nil, ErrorWithContext(ctx, ErrPolicyBodyNotInRequest)
}
mAdmin, err := NewMinioAdminClient(session)
if err != nil {
return nil, ErrorWithContext(ctx, err)
}
// create a MinIO Admin Client interface implementation
// defining the client to be used
adminClient := AdminClient{Client: mAdmin}
profilingItems, err := startProfiling(ctx, adminClient, *params.Body.Type)
if err != nil {
return nil, ErrorWithContext(ctx, err)
}
profilingList := &models.StartProfilingList{
StartResults: profilingItems,
Total: int64(len(profilingItems)),
}
return profilingList, nil
}
// stopProfiling() stop the profiling on the Minio server and returns
// the generated Zip file as io.ReadCloser
func stopProfiling(ctx context.Context, client MinioAdmin) (io.ReadCloser, error) {
zippedData, err := client.stopProfiling(ctx)
if err != nil {
return nil, err
return err
}
return zippedData, nil
}
// getProfilingStopResponse() performs SetPolicy() and serializes it to the handler's output
func getProfilingStopResponse(session *models.Principal, params profileApi.ProfilingStopParams) (io.ReadCloser, *models.Error) {
ctx := params.HTTPRequest.Context()
mAdmin, err := NewMinioAdminClient(session)
message, err := ioutil.ReadAll(zippedData)
if err != nil {
return nil, ErrorWithContext(ctx, err)
return err
}
// create a MinIO Admin Client interface implementation
// defining the client to be used
adminClient := AdminClient{Client: mAdmin}
profilingData, err := stopProfiling(ctx, adminClient)
if err != nil {
return nil, ErrorWithContext(ctx, err)
}
return profilingData, nil
return conn.writeMessage(websocket.BinaryMessage, message)
}

View File

@@ -21,6 +21,8 @@ import (
"context"
"errors"
"io"
"net/http"
"net/url"
"testing"
"github.com/minio/madmin-go"
@@ -32,22 +34,38 @@ var (
minioStopProfiling func() (io.ReadCloser, error)
)
// mock function of startProfiling()
// mock function for startProfiling()
func (ac adminClientMock) startProfiling(ctx context.Context, profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error) {
return minioStartProfiling(profiler)
}
// mock function of stopProfiling()
// mock function for stopProfiling()
func (ac adminClientMock) stopProfiling(ctx context.Context) (io.ReadCloser, error) {
return minioStopProfiling()
}
// Implementing fake closingBuffer to mock stopProfiling() (io.ReadCloser, error)
type ClosingBuffer struct {
*bytes.Buffer
}
// Implementing a fake Close function for io.ReadCloser
func (cb *ClosingBuffer) Close() error {
return nil
}
func TestStartProfiling(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
assert := assert.New(t)
adminClient := adminClientMock{}
// Test-1 : startProfiling() Get response from Minio server with one profiling object
mockWSConn := mockConn{}
function := "startProfiling()"
testOptions := &profileOptions{
Types: "cpu",
}
// Test-1 : startProfiling() Get response from MinIO server with one profiling object without errors
// mock function response from startProfiling()
minioStartProfiling = func(profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error) {
return []madmin.StartProfilingResult{
@@ -63,56 +81,40 @@ func TestStartProfiling(t *testing.T) {
},
}, nil
}
function := "startProfiling()"
cpuProfiler := "cpu"
startProfilingResults, err := startProfiling(ctx, adminClient, cpuProfiler)
// mock function response from stopProfiling()
minioStopProfiling = func() (io.ReadCloser, error) {
return &ClosingBuffer{bytes.NewBufferString("In memory string eaeae")}, nil
}
// mock function response from mockConn.writeMessage()
connWriteMessageMock = func(messageType int, p []byte) error {
return nil
}
err := startProfiling(ctx, mockWSConn, adminClient, testOptions)
if err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
assert.Equal(2, len(startProfilingResults))
// Test-2 : startProfiling() Correctly handles errors returned by Minio
assert.Equal(err, nil)
// Test-2 : startProfiling() Correctly handles errors returned by MinIO
// mock function response from startProfiling()
minioStartProfiling = func(profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error) {
return nil, errors.New("error")
}
_, err = startProfiling(ctx, adminClient, cpuProfiler)
err = startProfiling(ctx, mockWSConn, adminClient, testOptions)
if assert.Error(err) {
assert.Equal("error", err.Error())
}
}
// Implementing fake closingBuffer need it to mock stopProfiling() (io.ReadCloser, error)
type ClosingBuffer struct {
*bytes.Buffer
}
// Implementing a fake Close function for io.ReadCloser
func (cb *ClosingBuffer) Close() error {
return nil
}
func TestStopProfiling(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
assert := assert.New(t)
adminClient := adminClientMock{}
// Test-1 : stopProfiling() Get response from Minio server and that response is a readCloser interface
// mock function response from startProfiling()
minioStopProfiling = func() (io.ReadCloser, error) {
return &ClosingBuffer{bytes.NewBufferString("In memory string eaeae")}, nil
// Test-3: getProfileOptionsFromReq() correctly returns profile options from request
u, _ := url.Parse("ws://localhost/ws/profile?types=cpu,mem,block,mutex,trace,threads,goroutines")
req := &http.Request{
URL: u,
}
function := "stopProfiling()"
_, err := stopProfiling(ctx, adminClient)
if err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// Test-2 : stopProfiling() Correctly handles errors returned by Minio
// mock function response from stopProfiling()
minioStopProfiling = func() (io.ReadCloser, error) {
return nil, errors.New("error")
}
_, err = stopProfiling(ctx, adminClient)
if assert.Error(err) {
assert.Equal("error", err.Error())
opts, err := getProfileOptionsFromReq(req)
if assert.NoError(err) {
expectedOptions := profileOptions{
Types: "cpu,mem,block,mutex,trace,threads,goroutines",
}
assert.Equal(expectedOptions.Types, opts.Types)
}
}

View File

@@ -118,8 +118,6 @@ func configureAPI(api *operations.ConsoleAPI) http.Handler {
registerBucketsLifecycleHandlers(api)
// Register service handlers
registerServiceHandlers(api)
// Register profiling handlers
registerProfilingHandler(api)
// Register session handlers
registerSessionHandlers(api)
// Register version handlers

View File

@@ -250,7 +250,6 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
closeWsConn(conn)
return
}
wsAdminClient, err := newWebSocketAdminClient(conn, session)
if err != nil {
ErrorWithContext(ctx, err)
@@ -258,6 +257,20 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
return
}
go wsAdminClient.speedtest(ctx, speedtestOpts)
case strings.HasPrefix(wsPath, `/profile`):
pOptions, err := getProfileOptionsFromReq(req)
if err != nil {
ErrorWithContext(ctx, fmt.Errorf("error getting profile options: %v", err))
closeWsConn(conn)
return
}
wsAdminClient, err := newWebSocketAdminClient(conn, session)
if err != nil {
ErrorWithContext(ctx, err)
closeWsConn(conn)
return
}
go wsAdminClient.profile(ctx, pOptions)
default:
// path not found
@@ -464,6 +477,21 @@ func (wsc *wsAdminClient) speedtest(ctx context.Context, opts *madmin.SpeedtestO
sendWsCloseMessage(wsc.conn, err)
}
func (wsc *wsAdminClient) profile(ctx context.Context, opts *profileOptions) {
defer func() {
LogInfo("profile stopped")
// close connection after return
wsc.conn.close()
}()
LogInfo("profile started")
ctx = wsReadClientCtx(ctx, wsc.conn)
err := startProfiling(ctx, wsc.conn, wsc.client, opts)
sendWsCloseMessage(wsc.conn, err)
}
// sendWsCloseMessage sends Websocket Connection Close Message indicating the Status Code
// see https://tools.ietf.org/html/rfc6455#page-45
func sendWsCloseMessage(conn WSConn, err error) {