diff --git a/integration/profiling_test.go b/integration/profiling_test.go index 2efd5efd0..909ab9092 100644 --- a/integration/profiling_test.go +++ b/integration/profiling_test.go @@ -20,18 +20,16 @@ import ( "archive/zip" "bytes" "fmt" - "io" "log" "net/http" - "strings" "testing" - "time" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" ) func TestStartProfiling(t *testing.T) { - testAsser := assert.New(t) + testAssert := assert.New(t) tests := []struct { name string @@ -58,53 +56,28 @@ func TestStartProfiling(t *testing.T) { "profile-127.0.0.1:9000-mutex-before.pprof": false, } - client := &http.Client{ - Timeout: 3 * time.Second, - } + wsDestination := "/ws/profile?types=cpu,mem,block,mutex,trace,threads,goroutines" + wsFinalURL := fmt.Sprintf("ws://localhost:9090%s", wsDestination) - destination := "/api/v1/profiling/start" - finalURL := fmt.Sprintf("http://localhost:9090%s", destination) - request, err := http.NewRequest("POST", finalURL, strings.NewReader("{\"type\":\"cpu,mem,block,mutex,trace,threads,goroutines\"}")) + ws, _, err := websocket.DefaultDialer.Dial(wsFinalURL, nil) + if err != nil { + log.Println(err) + return + } + defer ws.Close() + + _, zipFileBytes, err := ws.ReadMessage() if err != nil { log.Println(err) return } - request.Header.Add("Cookie", fmt.Sprintf("token=%s", token)) - request.Header.Add("Content-Type", "application/json") - - response, err := client.Do(request) - - testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err)) - testAsser.Equal(201, response.StatusCode) - - time.Sleep(5 * time.Second) - - destination = "/api/v1/profiling/stop" - finalURL = fmt.Sprintf("http://localhost:9090%s", destination) - request, err = http.NewRequest("POST", finalURL, nil) - if err != nil { - log.Println(err) - return - } - - request.Header.Add("Cookie", fmt.Sprintf("token=%s", token)) - request.Header.Add("Content-Type", "application/json") - - response, err = client.Do(request) - - testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err)) - testAsser.Equal(200, response.StatusCode) - zipFileBytes, err := io.ReadAll(response.Body) - if err != nil { - testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err)) - } filetype := http.DetectContentType(zipFileBytes) - testAsser.Equal("application/zip", filetype) + testAssert.Equal("application/zip", filetype) zipReader, err := zip.NewReader(bytes.NewReader(zipFileBytes), int64(len(zipFileBytes))) if err != nil { - testAsser.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err)) + testAssert.Nil(err, fmt.Sprintf("%s returned an error: %v", tt.name, err)) } // Read all the files from zip archive @@ -113,7 +86,7 @@ func TestStartProfiling(t *testing.T) { } for k, v := range files { - testAsser.Equal(true, v, fmt.Sprintf("%s : compressed file expected to have %v file inside", tt.name, k)) + testAssert.Equal(true, v, fmt.Sprintf("%s : compressed file expected to have %v file inside", tt.name, k)) } }) } diff --git a/portal-ui/src/screens/Console/Support/Profile.tsx b/portal-ui/src/screens/Console/Support/Profile.tsx index 33e809d6d..9a323648f 100644 --- a/portal-ui/src/screens/Console/Support/Profile.tsx +++ b/portal-ui/src/screens/Console/Support/Profile.tsx @@ -1,4 +1,5 @@ import React, { Fragment, useState } from "react"; +import { IMessageEvent, w3cwebsocket as W3CWebSocket } from "websocket"; import { Theme } from "@mui/material/styles"; import createStyles from "@mui/styles/createStyles"; import withStyles from "@mui/styles/withStyles"; @@ -6,8 +7,7 @@ import { Button, Grid } from "@mui/material"; import PageHeader from "../Common/PageHeader/PageHeader"; import PageLayout from "../Common/Layout/PageLayout"; import CheckboxWrapper from "../Common/FormComponents/CheckboxWrapper/CheckboxWrapper"; -import api from "../../../common/api"; -import { ErrorResponseHandler } from "../../../common/types"; +import { wsProtocol } from "../../../utils/wsUtils"; import { actionsTray, containerForHeader, @@ -52,6 +52,8 @@ interface IProfileProps { classes: any; } +var c: any = null; + const Profile = ({ classes }: IProfileProps) => { const [profilingStarted, setProfilingStarted] = useState(false); const [types, setTypes] = useState([ @@ -85,43 +87,53 @@ const Profile = ({ classes }: IProfileProps) => { }; const startProfiling = () => { - if (!profilingStarted) { - const typeString = types.join(","); - setProfilingStarted(true); - api - .invoke("POST", `/api/v1/profiling/start`, { - type: typeString, - }) - .then(() => {}) - .catch((err: ErrorResponseHandler) => { - console.log(err); - setProfilingStarted(false); - }); + const typeString = types.join(","); + + const url = new URL(window.location.toString()); + const isDev = process.env.NODE_ENV === "development"; + const port = isDev ? "9090" : url.port; + + // check if we are using base path, if not this always is `/` + const baseLocation = new URL(document.baseURI); + const baseUrl = baseLocation.pathname; + + const wsProt = wsProtocol(url.protocol); + c = new W3CWebSocket( + `${wsProt}://${url.hostname}:${port}${baseUrl}ws/profile?types=${typeString}` + ); + + if (c !== null) { + c.onopen = () => { + setProfilingStarted(true); + c.send("ok"); + }; + c.onmessage = (message: IMessageEvent) => { + // process received message + let response = new Blob([message.data], { type: "application/zip" }); + let filename = "profile.zip"; + setProfilingStarted(false); + var link = document.createElement("a"); + link.href = window.URL.createObjectURL(response); + link.download = filename; + document.body.appendChild(link); + link.click(); + document.body.removeChild(link); + }; + c.onclose = () => { + console.log("connection closed by server"); + setProfilingStarted(false); + }; + return () => { + c.close(1000); + console.log("closing websockets"); + setProfilingStarted(false); + }; } }; const stopProfiling = () => { - if (profilingStarted) { - const anchor = document.createElement("a"); - document.body.appendChild(anchor); - let path = "/api/v1/profiling/stop"; - var req = new XMLHttpRequest(); - req.open("POST", path, true); - req.responseType = "blob"; - req.onreadystatechange = () => { - if (req.readyState === 4 && req.status === 200) { - let filename = "profile.zip"; - setProfilingStarted(false); - var link = document.createElement("a"); - link.href = window.URL.createObjectURL(req.response); - link.download = filename; - document.body.appendChild(link); - link.click(); - document.body.removeChild(link); - } - }; - req.send(); - } + c.close(1000); + setProfilingStarted(false); }; return ( diff --git a/restapi/admin_profiling.go b/restapi/admin_profiling.go index 0ad11c267..f07e373cf 100644 --- a/restapi/admin_profiling.go +++ b/restapi/admin_profiling.go @@ -18,59 +18,32 @@ package restapi import ( "context" - "io" + "io/ioutil" "net/http" - "github.com/go-openapi/runtime" - "github.com/go-openapi/runtime/middleware" + "github.com/gorilla/websocket" "github.com/minio/console/models" - "github.com/minio/console/restapi/operations" - profileApi "github.com/minio/console/restapi/operations/profile" "github.com/minio/madmin-go" ) -func registerProfilingHandler(api *operations.ConsoleAPI) { - // Start Profiling - api.ProfileProfilingStartHandler = profileApi.ProfilingStartHandlerFunc(func(params profileApi.ProfilingStartParams, session *models.Principal) middleware.Responder { - profilingStartResponse, err := getProfilingStartResponse(session, params) - if err != nil { - return profileApi.NewProfilingStartDefault(int(err.Code)).WithPayload(err) - } - return profileApi.NewProfilingStartCreated().WithPayload(profilingStartResponse) - }) - // Stop and download profiling data - api.ProfileProfilingStopHandler = profileApi.ProfilingStopHandlerFunc(func(params profileApi.ProfilingStopParams, session *models.Principal) middleware.Responder { - profilingStopResponse, err := getProfilingStopResponse(session, params) - if err != nil { - return profileApi.NewProfilingStopDefault(int(err.Code)).WithPayload(err) - } - // Custom response writer to set the content-disposition header to tell the - // HTTP client the name and extension of the file we are returning - return middleware.ResponderFunc(func(w http.ResponseWriter, _ runtime.Producer) { - defer profilingStopResponse.Close() - w.Header().Set("Content-Type", "application/zip") - w.Header().Set("Content-Disposition", "attachment; filename=profile.zip") - io.Copy(w, profilingStopResponse) - }) - }) +var items []*models.StartProfilingItem + +type profileOptions struct { + Types string } -// startProfiling() starts the profiling on the Minio server -// Enable 1 of the 7 profiling mechanisms: "cpu","mem","block","mutex","trace","threads","goroutines" -// in the Minio server, returns []*models.StartProfilingItem that contains individual status of this operation -// for each Minio node, ie: -// -// { -// "Success": true, -// "nodeName": "127.0.0.1:9000" -// "errors": "" -// } -func startProfiling(ctx context.Context, client MinioAdmin, profilerType string) ([]*models.StartProfilingItem, error) { - profilingResults, err := client.startProfiling(ctx, madmin.ProfilerType(profilerType)) +func getProfileOptionsFromReq(req *http.Request) (*profileOptions, error) { + pOptions := profileOptions{} + pOptions.Types = req.FormValue("types") + return &pOptions, nil +} + +func startProfiling(ctx context.Context, conn WSConn, client MinioAdmin, pOpts *profileOptions) error { + profilingResults, err := client.startProfiling(ctx, madmin.ProfilerType(pOpts.Types)) if err != nil { - return nil, err + return err } - var items []*models.StartProfilingItem + items = []*models.StartProfilingItem{} for _, result := range profilingResults { items = append(items, &models.StartProfilingItem{ Success: result.Success, @@ -78,57 +51,13 @@ func startProfiling(ctx context.Context, client MinioAdmin, profilerType string) NodeName: result.NodeName, }) } - return items, nil -} - -// getProfilingStartResponse performs startProfiling() and serializes it to the handler's output -func getProfilingStartResponse(session *models.Principal, params profileApi.ProfilingStartParams) (*models.StartProfilingList, *models.Error) { - ctx, cancel := context.WithCancel(params.HTTPRequest.Context()) - defer cancel() - if params.Body == nil { - return nil, ErrorWithContext(ctx, ErrPolicyBodyNotInRequest) - } - mAdmin, err := NewMinioAdminClient(session) - if err != nil { - return nil, ErrorWithContext(ctx, err) - } - // create a MinIO Admin Client interface implementation - // defining the client to be used - adminClient := AdminClient{Client: mAdmin} - profilingItems, err := startProfiling(ctx, adminClient, *params.Body.Type) - if err != nil { - return nil, ErrorWithContext(ctx, err) - } - profilingList := &models.StartProfilingList{ - StartResults: profilingItems, - Total: int64(len(profilingItems)), - } - return profilingList, nil -} - -// stopProfiling() stop the profiling on the Minio server and returns -// the generated Zip file as io.ReadCloser -func stopProfiling(ctx context.Context, client MinioAdmin) (io.ReadCloser, error) { zippedData, err := client.stopProfiling(ctx) if err != nil { - return nil, err + return err } - return zippedData, nil -} - -// getProfilingStopResponse() performs SetPolicy() and serializes it to the handler's output -func getProfilingStopResponse(session *models.Principal, params profileApi.ProfilingStopParams) (io.ReadCloser, *models.Error) { - ctx := params.HTTPRequest.Context() - mAdmin, err := NewMinioAdminClient(session) + message, err := ioutil.ReadAll(zippedData) if err != nil { - return nil, ErrorWithContext(ctx, err) + return err } - // create a MinIO Admin Client interface implementation - // defining the client to be used - adminClient := AdminClient{Client: mAdmin} - profilingData, err := stopProfiling(ctx, adminClient) - if err != nil { - return nil, ErrorWithContext(ctx, err) - } - return profilingData, nil + return conn.writeMessage(websocket.BinaryMessage, message) } diff --git a/restapi/admin_profiling_test.go b/restapi/admin_profiling_test.go index 591a2cb59..e95a463f6 100644 --- a/restapi/admin_profiling_test.go +++ b/restapi/admin_profiling_test.go @@ -21,6 +21,8 @@ import ( "context" "errors" "io" + "net/http" + "net/url" "testing" "github.com/minio/madmin-go" @@ -32,22 +34,38 @@ var ( minioStopProfiling func() (io.ReadCloser, error) ) -// mock function of startProfiling() +// mock function for startProfiling() func (ac adminClientMock) startProfiling(ctx context.Context, profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error) { return minioStartProfiling(profiler) } -// mock function of stopProfiling() +// mock function for stopProfiling() func (ac adminClientMock) stopProfiling(ctx context.Context) (io.ReadCloser, error) { return minioStopProfiling() } +// Implementing fake closingBuffer to mock stopProfiling() (io.ReadCloser, error) +type ClosingBuffer struct { + *bytes.Buffer +} + +// Implementing a fake Close function for io.ReadCloser +func (cb *ClosingBuffer) Close() error { + return nil +} + func TestStartProfiling(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() assert := assert.New(t) adminClient := adminClientMock{} - // Test-1 : startProfiling() Get response from Minio server with one profiling object + mockWSConn := mockConn{} + function := "startProfiling()" + testOptions := &profileOptions{ + Types: "cpu", + } + + // Test-1 : startProfiling() Get response from MinIO server with one profiling object without errors // mock function response from startProfiling() minioStartProfiling = func(profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error) { return []madmin.StartProfilingResult{ @@ -63,56 +81,40 @@ func TestStartProfiling(t *testing.T) { }, }, nil } - function := "startProfiling()" - cpuProfiler := "cpu" - startProfilingResults, err := startProfiling(ctx, adminClient, cpuProfiler) + // mock function response from stopProfiling() + minioStopProfiling = func() (io.ReadCloser, error) { + return &ClosingBuffer{bytes.NewBufferString("In memory string eaeae")}, nil + } + // mock function response from mockConn.writeMessage() + connWriteMessageMock = func(messageType int, p []byte) error { + return nil + } + err := startProfiling(ctx, mockWSConn, adminClient, testOptions) if err != nil { t.Errorf("Failed on %s:, error occurred: %s", function, err.Error()) } - assert.Equal(2, len(startProfilingResults)) - // Test-2 : startProfiling() Correctly handles errors returned by Minio + assert.Equal(err, nil) + + // Test-2 : startProfiling() Correctly handles errors returned by MinIO // mock function response from startProfiling() minioStartProfiling = func(profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error) { return nil, errors.New("error") } - _, err = startProfiling(ctx, adminClient, cpuProfiler) + err = startProfiling(ctx, mockWSConn, adminClient, testOptions) if assert.Error(err) { assert.Equal("error", err.Error()) } -} -// Implementing fake closingBuffer need it to mock stopProfiling() (io.ReadCloser, error) -type ClosingBuffer struct { - *bytes.Buffer -} - -// Implementing a fake Close function for io.ReadCloser -func (cb *ClosingBuffer) Close() error { - return nil -} - -func TestStopProfiling(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - assert := assert.New(t) - adminClient := adminClientMock{} - // Test-1 : stopProfiling() Get response from Minio server and that response is a readCloser interface - // mock function response from startProfiling() - minioStopProfiling = func() (io.ReadCloser, error) { - return &ClosingBuffer{bytes.NewBufferString("In memory string eaeae")}, nil + // Test-3: getProfileOptionsFromReq() correctly returns profile options from request + u, _ := url.Parse("ws://localhost/ws/profile?types=cpu,mem,block,mutex,trace,threads,goroutines") + req := &http.Request{ + URL: u, } - function := "stopProfiling()" - _, err := stopProfiling(ctx, adminClient) - if err != nil { - t.Errorf("Failed on %s:, error occurred: %s", function, err.Error()) - } - // Test-2 : stopProfiling() Correctly handles errors returned by Minio - // mock function response from stopProfiling() - minioStopProfiling = func() (io.ReadCloser, error) { - return nil, errors.New("error") - } - _, err = stopProfiling(ctx, adminClient) - if assert.Error(err) { - assert.Equal("error", err.Error()) + opts, err := getProfileOptionsFromReq(req) + if assert.NoError(err) { + expectedOptions := profileOptions{ + Types: "cpu,mem,block,mutex,trace,threads,goroutines", + } + assert.Equal(expectedOptions.Types, opts.Types) } } diff --git a/restapi/configure_console.go b/restapi/configure_console.go index dfa2cf89c..7c0e446c8 100644 --- a/restapi/configure_console.go +++ b/restapi/configure_console.go @@ -118,8 +118,6 @@ func configureAPI(api *operations.ConsoleAPI) http.Handler { registerBucketsLifecycleHandlers(api) // Register service handlers registerServiceHandlers(api) - // Register profiling handlers - registerProfilingHandler(api) // Register session handlers registerSessionHandlers(api) // Register version handlers diff --git a/restapi/ws_handle.go b/restapi/ws_handle.go index 8abdf3415..4e1f22573 100644 --- a/restapi/ws_handle.go +++ b/restapi/ws_handle.go @@ -250,7 +250,6 @@ func serveWS(w http.ResponseWriter, req *http.Request) { closeWsConn(conn) return } - wsAdminClient, err := newWebSocketAdminClient(conn, session) if err != nil { ErrorWithContext(ctx, err) @@ -258,6 +257,20 @@ func serveWS(w http.ResponseWriter, req *http.Request) { return } go wsAdminClient.speedtest(ctx, speedtestOpts) + case strings.HasPrefix(wsPath, `/profile`): + pOptions, err := getProfileOptionsFromReq(req) + if err != nil { + ErrorWithContext(ctx, fmt.Errorf("error getting profile options: %v", err)) + closeWsConn(conn) + return + } + wsAdminClient, err := newWebSocketAdminClient(conn, session) + if err != nil { + ErrorWithContext(ctx, err) + closeWsConn(conn) + return + } + go wsAdminClient.profile(ctx, pOptions) default: // path not found @@ -464,6 +477,21 @@ func (wsc *wsAdminClient) speedtest(ctx context.Context, opts *madmin.SpeedtestO sendWsCloseMessage(wsc.conn, err) } +func (wsc *wsAdminClient) profile(ctx context.Context, opts *profileOptions) { + defer func() { + LogInfo("profile stopped") + // close connection after return + wsc.conn.close() + }() + LogInfo("profile started") + + ctx = wsReadClientCtx(ctx, wsc.conn) + + err := startProfiling(ctx, wsc.conn, wsc.client, opts) + + sendWsCloseMessage(wsc.conn, err) +} + // sendWsCloseMessage sends Websocket Connection Close Message indicating the Status Code // see https://tools.ietf.org/html/rfc6455#page-45 func sendWsCloseMessage(conn WSConn, err error) {