Improved trace functionality & added filters support (#817)
Co-authored-by: Benjamin Perez <benjamin@bexsoft.net>
This commit is contained in:
@@ -47,15 +47,54 @@ type callStats struct {
|
||||
Ttfb string `json:"timeToFirstByte"`
|
||||
}
|
||||
|
||||
type serviceTraceOpts struct {
|
||||
AllTraffic bool
|
||||
ErrOnly bool
|
||||
// trace filters
|
||||
func matchTrace(opts TraceRequest, traceInfo madmin.ServiceTraceInfo) bool {
|
||||
statusCode := int(opts.statusCode)
|
||||
method := opts.method
|
||||
funcName := opts.funcName
|
||||
apiPath := opts.path
|
||||
|
||||
if statusCode == 0 && method == "" && funcName == "" && apiPath == "" {
|
||||
// no specific filtering found trace all the requests
|
||||
return true
|
||||
}
|
||||
|
||||
// Filter request path if passed by the user
|
||||
if apiPath != "" {
|
||||
pathToLookup := strings.ToLower(apiPath)
|
||||
pathFromTrace := strings.ToLower(traceInfo.Trace.ReqInfo.Path)
|
||||
|
||||
return strings.Contains(pathFromTrace, pathToLookup)
|
||||
}
|
||||
|
||||
// Filter response status codes if passed by the user
|
||||
if statusCode > 0 {
|
||||
statusCodeFromTrace := traceInfo.Trace.RespInfo.StatusCode
|
||||
|
||||
return statusCodeFromTrace == statusCode
|
||||
}
|
||||
|
||||
// Filter request method if passed by the user
|
||||
if method != "" {
|
||||
methodFromTrace := traceInfo.Trace.ReqInfo.Method
|
||||
|
||||
return methodFromTrace == method
|
||||
}
|
||||
|
||||
if funcName != "" {
|
||||
funcToLookup := strings.ToLower(funcName)
|
||||
funcFromTrace := strings.ToLower(traceInfo.Trace.FuncName)
|
||||
|
||||
return strings.Contains(funcFromTrace, funcToLookup)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// startTraceInfo starts trace of the servers
|
||||
func startTraceInfo(ctx context.Context, conn WSConn, client MinioAdmin, opts serviceTraceOpts) error {
|
||||
func startTraceInfo(ctx context.Context, conn WSConn, client MinioAdmin, opts TraceRequest) error {
|
||||
// Start listening on all trace activity.
|
||||
traceCh := client.serviceTrace(ctx, opts.AllTraffic, opts.ErrOnly)
|
||||
traceCh := client.serviceTrace(ctx, opts.threshold, opts.s3, opts.internal, opts.storage, opts.os, opts.onlyErrors)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -69,17 +108,19 @@ func startTraceInfo(ctx context.Context, conn WSConn, client MinioAdmin, opts se
|
||||
LogError("error on serviceTrace: %v", traceInfo.Err)
|
||||
return traceInfo.Err
|
||||
}
|
||||
// Serialize message to be sent
|
||||
traceInfoBytes, err := json.Marshal(shortTrace(&traceInfo))
|
||||
if err != nil {
|
||||
LogError("error on json.Marshal: %v", err)
|
||||
return err
|
||||
}
|
||||
// Send Message through websocket connection
|
||||
err = conn.writeMessage(websocket.TextMessage, traceInfoBytes)
|
||||
if err != nil {
|
||||
LogError("error writeMessage: %v", err)
|
||||
return err
|
||||
if matchTrace(opts, traceInfo) {
|
||||
// Serialize message to be sent
|
||||
traceInfoBytes, err := json.Marshal(shortTrace(&traceInfo))
|
||||
if err != nil {
|
||||
LogError("error on json.Marshal: %v", err)
|
||||
return err
|
||||
}
|
||||
// Send Message through websocket connection
|
||||
err = conn.writeMessage(websocket.TextMessage, traceInfoBytes)
|
||||
if err != nil {
|
||||
LogError("error writeMessage: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,6 +141,7 @@ func shortTrace(info *madmin.ServiceTraceInfo) shortTraceMsg {
|
||||
s.CallStats.Duration = t.CallStats.Latency.String()
|
||||
s.CallStats.Rx = t.CallStats.InputBytes
|
||||
s.CallStats.Tx = t.CallStats.OutputBytes
|
||||
s.CallStats.Ttfb = t.CallStats.TimeToFirstByte.String()
|
||||
|
||||
if host, ok := t.ReqInfo.Headers["Host"]; ok {
|
||||
s.Host = strings.Join(host, "")
|
||||
|
||||
@@ -27,11 +27,11 @@ import (
|
||||
)
|
||||
|
||||
// assigning mock at runtime instead of compile time
|
||||
var minioServiceTraceMock func(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo
|
||||
var minioServiceTraceMock func(ctx context.Context, threshold int64, s3, internal, storage, os, errTrace bool) <-chan madmin.ServiceTraceInfo
|
||||
|
||||
// mock function of listPolicies()
|
||||
func (ac adminClientMock) serviceTrace(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
return minioServiceTraceMock(ctx, allTrace, errTrace)
|
||||
func (ac adminClientMock) serviceTrace(ctx context.Context, threshold int64, s3, internal, storage, os, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
return minioServiceTraceMock(ctx, threshold, s3, internal, storage, os, errTrace)
|
||||
}
|
||||
|
||||
func TestAdminTrace(t *testing.T) {
|
||||
@@ -48,7 +48,7 @@ func TestAdminTrace(t *testing.T) {
|
||||
|
||||
// Test-1: Serve Trace with no errors until trace finishes sending
|
||||
// define mock function behavior for minio server Trace
|
||||
minioServiceTraceMock = func(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
minioServiceTraceMock = func(ctx context.Context, threshold int64, s3, internal, storage, os, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
ch := make(chan madmin.ServiceTraceInfo)
|
||||
// Only success, start a routine to start reading line by line.
|
||||
go func(ch chan<- madmin.ServiceTraceInfo) {
|
||||
@@ -82,7 +82,7 @@ func TestAdminTrace(t *testing.T) {
|
||||
writesCount++
|
||||
return nil
|
||||
}
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient, serviceTraceOpts{AllTraffic: true, ErrOnly: false}); err != nil {
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient, TraceRequest{s3: true, internal: true, storage: true, os: true, onlyErrors: false}); err != nil {
|
||||
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
|
||||
}
|
||||
// check that the TestReceiver got the same number of data from trace.
|
||||
@@ -94,13 +94,13 @@ func TestAdminTrace(t *testing.T) {
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return fmt.Errorf("error on write")
|
||||
}
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient, serviceTraceOpts{}); assert.Error(err) {
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient, TraceRequest{}); assert.Error(err) {
|
||||
assert.Equal("error on write", err.Error())
|
||||
}
|
||||
|
||||
// Test-3: error happens on serviceTrace Minio, trace should stop
|
||||
// and error shall be returned.
|
||||
minioServiceTraceMock = func(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
minioServiceTraceMock = func(ctx context.Context, threshold int64, s3, internal, storage, os, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
ch := make(chan madmin.ServiceTraceInfo)
|
||||
// Only success, start a routine to start reading line by line.
|
||||
go func(ch chan<- madmin.ServiceTraceInfo) {
|
||||
@@ -120,7 +120,7 @@ func TestAdminTrace(t *testing.T) {
|
||||
connWriteMessageMock = func(messageType int, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient, serviceTraceOpts{}); assert.Error(err) {
|
||||
if err := startTraceInfo(ctx, mockWSConn, adminClient, TraceRequest{}); assert.Error(err) {
|
||||
assert.Equal("error on trace", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ type MinioAdmin interface {
|
||||
serverInfo(ctx context.Context) (madmin.InfoMessage, error)
|
||||
startProfiling(ctx context.Context, profiler madmin.ProfilerType) ([]madmin.StartProfilingResult, error)
|
||||
stopProfiling(ctx context.Context) (io.ReadCloser, error)
|
||||
serviceTrace(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo
|
||||
serviceTrace(ctx context.Context, threshold int64, s3, internal, storage, os, errTrace bool) <-chan madmin.ServiceTraceInfo
|
||||
getLogs(ctx context.Context, node string, lineCnt int, logKind string) <-chan madmin.LogInfo
|
||||
accountInfo(ctx context.Context) (madmin.AccountInfo, error)
|
||||
heal(ctx context.Context, bucket, prefix string, healOpts madmin.HealOpts, clientToken string,
|
||||
@@ -253,13 +253,16 @@ func (ac adminClient) stopProfiling(ctx context.Context) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
// implements madmin.ServiceTrace()
|
||||
func (ac adminClient) serviceTrace(ctx context.Context, allTrace, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
func (ac adminClient) serviceTrace(ctx context.Context, threshold int64, s3, internal, storage, os, errTrace bool) <-chan madmin.ServiceTraceInfo {
|
||||
thresholdT := time.Duration(threshold)
|
||||
|
||||
tracingOptions := madmin.ServiceTraceOpts{
|
||||
S3: true,
|
||||
OnlyErrors: errTrace,
|
||||
Internal: allTrace,
|
||||
Storage: allTrace,
|
||||
OS: allTrace,
|
||||
Internal: internal,
|
||||
Storage: storage,
|
||||
OS: os,
|
||||
Threshold: thresholdT,
|
||||
}
|
||||
|
||||
return ac.client.ServiceTrace(ctx, tracingOptions)
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -81,6 +82,20 @@ type wsConn struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
// Types for trace request. this adds support for calls, threshold, status and extra filters
|
||||
type TraceRequest struct {
|
||||
s3 bool
|
||||
internal bool
|
||||
storage bool
|
||||
os bool
|
||||
threshold int64
|
||||
onlyErrors bool
|
||||
statusCode int64
|
||||
method string
|
||||
funcName string
|
||||
path string
|
||||
}
|
||||
|
||||
func (c wsConn) writeMessage(messageType int, data []byte) error {
|
||||
return c.conn.WriteMessage(messageType, data)
|
||||
}
|
||||
@@ -122,7 +137,35 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
go wsAdminClient.trace()
|
||||
|
||||
calls := req.URL.Query().Get("calls")
|
||||
threshold, _ := strconv.ParseInt(req.URL.Query().Get("threshold"), 10, 64)
|
||||
onlyErrors := req.URL.Query().Get("onlyErrors")
|
||||
stCode, errorStCode := strconv.ParseInt(req.URL.Query().Get("statusCode"), 10, 64)
|
||||
method := req.URL.Query().Get("method")
|
||||
funcName := req.URL.Query().Get("funcname")
|
||||
path := req.URL.Query().Get("path")
|
||||
|
||||
statusCode := int64(0)
|
||||
|
||||
if errorStCode == nil {
|
||||
statusCode = stCode
|
||||
}
|
||||
|
||||
traceRequestItem := TraceRequest{
|
||||
s3: strings.Contains(calls, "s3") || strings.Contains(calls, "all"),
|
||||
internal: strings.Contains(calls, "internal") || strings.Contains(calls, "all"),
|
||||
storage: strings.Contains(calls, "storage") || strings.Contains(calls, "all"),
|
||||
os: strings.Contains(calls, "os") || strings.Contains(calls, "all"),
|
||||
onlyErrors: onlyErrors == "yes",
|
||||
threshold: threshold,
|
||||
statusCode: statusCode,
|
||||
method: method,
|
||||
funcName: funcName,
|
||||
path: path,
|
||||
}
|
||||
|
||||
go wsAdminClient.trace(traceRequestItem)
|
||||
case strings.HasPrefix(wsPath, `/console`):
|
||||
wsAdminClient, err := newWebSocketAdminClient(conn, session)
|
||||
if err != nil {
|
||||
@@ -254,7 +297,7 @@ func closeWsConn(conn *websocket.Conn) {
|
||||
|
||||
// trace serves madmin.ServiceTraceInfo
|
||||
// on a Websocket connection.
|
||||
func (wsc *wsAdminClient) trace() {
|
||||
func (wsc *wsAdminClient) trace(traceRequestItem TraceRequest) {
|
||||
defer func() {
|
||||
LogInfo("trace stopped")
|
||||
// close connection after return
|
||||
@@ -264,10 +307,7 @@ func (wsc *wsAdminClient) trace() {
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
|
||||
err := startTraceInfo(ctx, wsc.conn, wsc.client, serviceTraceOpts{
|
||||
AllTraffic: false,
|
||||
ErrOnly: false,
|
||||
})
|
||||
err := startTraceInfo(ctx, wsc.conn, wsc.client, traceRequestItem)
|
||||
|
||||
sendWsCloseMessage(wsc.conn, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user