Error and Audit logger webhooks (#1855)
Similar to MinIO now it's possible to configure webhooks to log all triggered errors and incomming requests via env variables: ``` CONSOLE_LOGGER_WEBHOOK_ENABLE_<ID> CONSOLE_LOGGER_WEBHOOK_ENDPOINT_<ID> CONSOLE_LOGGER_WEBHOOK_AUTH_TOKEN_<ID> CONSOLE_LOGGER_WEBHOOK_CLIENT_CERT_<ID> CONSOLE_LOGGER_WEBHOOK_CLIENT_KEY_<ID> CONSOLE_LOGGER_WEBHOOK_QUEUE_SIZE_<ID> CONSOLE_AUDIT_WEBHOOK_ENABLE_<ID> CONSOLE_AUDIT_WEBHOOK_ENDPOINT_<ID> CONSOLE_AUDIT_WEBHOOK_AUTH_TOKEN_<ID> CONSOLE_AUDIT_WEBHOOK_CLIENT_CERT_<ID> CONSOLE_AUDIT_WEBHOOK_QUEUE_SIZE_<ID> ``` Signed-off-by: Lenin Alevski <alevsk.8772@gmail.com>
This commit is contained in:
@@ -18,12 +18,15 @@ package restapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio/console/pkg/utils"
|
||||
|
||||
"github.com/go-openapi/errors"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/minio/console/models"
|
||||
@@ -121,17 +124,25 @@ func (c wsConn) readMessage() (messageType int, p []byte, err error) {
|
||||
// on the path.
|
||||
// Request should come like ws://<host>:<port>/ws/<api>
|
||||
func serveWS(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := req.Context()
|
||||
// Perform authentication before upgrading to a Websocket Connection
|
||||
// authenticate WS connection with Console
|
||||
session, err := auth.GetClaimsFromTokenInRequest(req)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
errors.ServeError(w, req, errors.New(http.StatusUnauthorized, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
//// DELETE ME !!!
|
||||
//upgrader.CheckOrigin = func(r *http.Request) bool {
|
||||
// return true
|
||||
//}
|
||||
|
||||
// upgrades the HTTP server connection to the WebSocket protocol.
|
||||
conn, err := upgrader.Upgrade(w, req, nil)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
errors.ServeError(w, req, err)
|
||||
return
|
||||
}
|
||||
@@ -141,6 +152,7 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
|
||||
case strings.HasPrefix(wsPath, `/trace`):
|
||||
wsAdminClient, err := newWebSocketAdminClient(conn, session)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
@@ -172,11 +184,12 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
|
||||
path: path,
|
||||
}
|
||||
|
||||
go wsAdminClient.trace(traceRequestItem)
|
||||
go wsAdminClient.trace(ctx, traceRequestItem)
|
||||
case strings.HasPrefix(wsPath, `/console`):
|
||||
|
||||
wsAdminClient, err := newWebSocketAdminClient(conn, session)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
@@ -187,60 +200,64 @@ func serveWS(w http.ResponseWriter, req *http.Request) {
|
||||
node: node,
|
||||
logType: logType,
|
||||
}
|
||||
go wsAdminClient.console(logRequestItem)
|
||||
go wsAdminClient.console(ctx, logRequestItem)
|
||||
case strings.HasPrefix(wsPath, `/health-info`):
|
||||
deadline, err := getHealthInfoOptionsFromReq(req)
|
||||
if err != nil {
|
||||
LogError("error getting health info options: %v", err)
|
||||
ErrorWithContext(ctx, fmt.Errorf("error getting health info options: %v", err))
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
wsAdminClient, err := newWebSocketAdminClient(conn, session)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
go wsAdminClient.healthInfo(deadline)
|
||||
go wsAdminClient.healthInfo(ctx, deadline)
|
||||
case strings.HasPrefix(wsPath, `/heal`):
|
||||
hOptions, err := getHealOptionsFromReq(req)
|
||||
if err != nil {
|
||||
LogError("error getting heal options: %v", err)
|
||||
ErrorWithContext(ctx, fmt.Errorf("error getting heal options: %v", err))
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
wsAdminClient, err := newWebSocketAdminClient(conn, session)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
go wsAdminClient.heal(hOptions)
|
||||
go wsAdminClient.heal(ctx, hOptions)
|
||||
case strings.HasPrefix(wsPath, `/watch`):
|
||||
wOptions, err := getWatchOptionsFromReq(req)
|
||||
if err != nil {
|
||||
LogError("error getting watch options: %v", err)
|
||||
ErrorWithContext(ctx, fmt.Errorf("error getting watch options: %v", err))
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
wsS3Client, err := newWebSocketS3Client(conn, session, wOptions.BucketName)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
go wsS3Client.watch(wOptions)
|
||||
go wsS3Client.watch(ctx, wOptions)
|
||||
case strings.HasPrefix(wsPath, `/speedtest`):
|
||||
speedtestOpts, err := getSpeedtestOptionsFromReq(req)
|
||||
if err != nil {
|
||||
LogError("error getting speedtest options: %v", err)
|
||||
ErrorWithContext(ctx, fmt.Errorf("error getting speedtest options: %v", err))
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
|
||||
wsAdminClient, err := newWebSocketAdminClient(conn, session)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, err)
|
||||
closeWsConn(conn)
|
||||
return
|
||||
}
|
||||
go wsAdminClient.speedtest(speedtestOpts)
|
||||
go wsAdminClient.speedtest(ctx, speedtestOpts)
|
||||
|
||||
default:
|
||||
// path not found
|
||||
@@ -292,26 +309,54 @@ func newWebSocketS3Client(conn *websocket.Conn, claims *models.Principal, bucket
|
||||
// if the client sends a Close Message the context will be
|
||||
// canceled. If the connection is closed the goroutine inside
|
||||
// will return.
|
||||
func wsReadClientCtx(conn WSConn) context.Context {
|
||||
func wsReadClientCtx(parentContext context.Context, conn WSConn) context.Context {
|
||||
// a cancel context is needed to end all goroutines used
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
var requestID string
|
||||
var SessionID string
|
||||
var UserAgent string
|
||||
var Host string
|
||||
var RemoteHost string
|
||||
|
||||
if val, o := parentContext.Value(utils.ContextRequestID).(string); o {
|
||||
requestID = val
|
||||
}
|
||||
if val, o := parentContext.Value(utils.ContextRequestUserID).(string); o {
|
||||
SessionID = val
|
||||
}
|
||||
if val, o := parentContext.Value(utils.ContextRequestUserAgent).(string); o {
|
||||
UserAgent = val
|
||||
}
|
||||
if val, o := parentContext.Value(utils.ContextRequestHost).(string); o {
|
||||
Host = val
|
||||
}
|
||||
if val, o := parentContext.Value(utils.ContextRequestRemoteAddr).(string); o {
|
||||
RemoteHost = val
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, utils.ContextRequestID, requestID)
|
||||
ctx = context.WithValue(ctx, utils.ContextRequestUserID, SessionID)
|
||||
ctx = context.WithValue(ctx, utils.ContextRequestUserAgent, UserAgent)
|
||||
ctx = context.WithValue(ctx, utils.ContextRequestHost, Host)
|
||||
ctx = context.WithValue(ctx, utils.ContextRequestRemoteAddr, RemoteHost)
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
for {
|
||||
_, _, err := conn.readMessage()
|
||||
if err != nil {
|
||||
// if error of type websocket.CloseError and is Unexpected
|
||||
// if errors of type websocket.CloseError and is Unexpected
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
||||
LogError("error unexpected CloseError on ReadMessage: %v", err)
|
||||
ErrorWithContext(ctx, fmt.Errorf("error unexpected CloseError on ReadMessage: %v", err))
|
||||
return
|
||||
}
|
||||
// Not all errors are of type websocket.CloseError.
|
||||
if _, ok := err.(*websocket.CloseError); !ok {
|
||||
LogError("error on ReadMessage: %v", err)
|
||||
ErrorWithContext(ctx, fmt.Errorf("error on ReadMessage: %v", err))
|
||||
return
|
||||
}
|
||||
// else is an expected Close Error
|
||||
LogError("closed conn.ReadMessage: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -327,7 +372,7 @@ func closeWsConn(conn *websocket.Conn) {
|
||||
|
||||
// trace serves madmin.ServiceTraceInfo
|
||||
// on a Websocket connection.
|
||||
func (wsc *wsAdminClient) trace(traceRequestItem TraceRequest) {
|
||||
func (wsc *wsAdminClient) trace(ctx context.Context, traceRequestItem TraceRequest) {
|
||||
defer func() {
|
||||
LogInfo("trace stopped")
|
||||
// close connection after return
|
||||
@@ -335,7 +380,7 @@ func (wsc *wsAdminClient) trace(traceRequestItem TraceRequest) {
|
||||
}()
|
||||
LogInfo("trace started")
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
ctx = wsReadClientCtx(ctx, wsc.conn)
|
||||
|
||||
err := startTraceInfo(ctx, wsc.conn, wsc.client, traceRequestItem)
|
||||
|
||||
@@ -344,7 +389,7 @@ func (wsc *wsAdminClient) trace(traceRequestItem TraceRequest) {
|
||||
|
||||
// console serves madmin.GetLogs
|
||||
// on a Websocket connection.
|
||||
func (wsc *wsAdminClient) console(logRequestItem LogRequest) {
|
||||
func (wsc *wsAdminClient) console(ctx context.Context, logRequestItem LogRequest) {
|
||||
defer func() {
|
||||
LogInfo("console logs stopped")
|
||||
// close connection after return
|
||||
@@ -352,14 +397,14 @@ func (wsc *wsAdminClient) console(logRequestItem LogRequest) {
|
||||
}()
|
||||
LogInfo("console logs started")
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
ctx = wsReadClientCtx(ctx, wsc.conn)
|
||||
|
||||
err := startConsoleLog(ctx, wsc.conn, wsc.client, logRequestItem)
|
||||
|
||||
sendWsCloseMessage(wsc.conn, err)
|
||||
}
|
||||
|
||||
func (wsc *wsS3Client) watch(params *watchOptions) {
|
||||
func (wsc *wsS3Client) watch(ctx context.Context, params *watchOptions) {
|
||||
defer func() {
|
||||
LogInfo("watch stopped")
|
||||
// close connection after return
|
||||
@@ -367,14 +412,14 @@ func (wsc *wsS3Client) watch(params *watchOptions) {
|
||||
}()
|
||||
LogInfo("watch started")
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
ctx = wsReadClientCtx(ctx, wsc.conn)
|
||||
|
||||
err := startWatch(ctx, wsc.conn, wsc.client, params)
|
||||
|
||||
sendWsCloseMessage(wsc.conn, err)
|
||||
}
|
||||
|
||||
func (wsc *wsAdminClient) heal(opts *healOptions) {
|
||||
func (wsc *wsAdminClient) heal(ctx context.Context, opts *healOptions) {
|
||||
defer func() {
|
||||
LogInfo("heal stopped")
|
||||
// close connection after return
|
||||
@@ -382,14 +427,14 @@ func (wsc *wsAdminClient) heal(opts *healOptions) {
|
||||
}()
|
||||
LogInfo("heal started")
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
ctx = wsReadClientCtx(ctx, wsc.conn)
|
||||
|
||||
err := startHeal(ctx, wsc.conn, wsc.client, opts)
|
||||
|
||||
sendWsCloseMessage(wsc.conn, err)
|
||||
}
|
||||
|
||||
func (wsc *wsAdminClient) healthInfo(deadline *time.Duration) {
|
||||
func (wsc *wsAdminClient) healthInfo(ctx context.Context, deadline *time.Duration) {
|
||||
defer func() {
|
||||
LogInfo("health info stopped")
|
||||
// close connection after return
|
||||
@@ -397,14 +442,14 @@ func (wsc *wsAdminClient) healthInfo(deadline *time.Duration) {
|
||||
}()
|
||||
LogInfo("health info started")
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
ctx = wsReadClientCtx(ctx, wsc.conn)
|
||||
|
||||
err := startHealthInfo(ctx, wsc.conn, wsc.client, deadline)
|
||||
|
||||
sendWsCloseMessage(wsc.conn, err)
|
||||
}
|
||||
|
||||
func (wsc *wsAdminClient) speedtest(opts *madmin.SpeedtestOpts) {
|
||||
func (wsc *wsAdminClient) speedtest(ctx context.Context, opts *madmin.SpeedtestOpts) {
|
||||
defer func() {
|
||||
LogInfo("speedtest stopped")
|
||||
// close connection after return
|
||||
@@ -412,7 +457,7 @@ func (wsc *wsAdminClient) speedtest(opts *madmin.SpeedtestOpts) {
|
||||
}()
|
||||
LogInfo("speedtest started")
|
||||
|
||||
ctx := wsReadClientCtx(wsc.conn)
|
||||
ctx = wsReadClientCtx(ctx, wsc.conn)
|
||||
|
||||
err := startSpeedtest(ctx, wsc.conn, wsc.client, opts)
|
||||
|
||||
@@ -434,7 +479,7 @@ func sendWsCloseMessage(conn WSConn, err error) {
|
||||
return
|
||||
}
|
||||
// else, internal server error
|
||||
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, ErrorGeneric.Error()))
|
||||
conn.writeMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, ErrDefault.Error()))
|
||||
return
|
||||
}
|
||||
// normal closure
|
||||
|
||||
Reference in New Issue
Block a user