diff --git a/cmd/config-current.go b/cmd/config-current.go index 9b7c87ad5..353069641 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -574,18 +574,6 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) { logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger/audit targets: %w", err)) } - for _, l := range loggerCfg.HTTP { - if l.Enabled { - l.LogOnce = logger.LogOnceIf - l.UserAgent = loggerUserAgent - l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) - // Enable http logging - if err = logger.AddTarget(http.New(l)); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to initialize server logger HTTP target: %w", err)) - } - } - } - for _, l := range loggerCfg.AuditWebhook { if l.Enabled { l.LogOnce = logger.LogOnceIf @@ -663,6 +651,25 @@ func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config return fmt.Errorf("Unable to apply scanner config: %w", err) } + // Logger webhook + loggerCfg, err := logger.LookupConfig(s) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err)) + } + userAgent := getUserAgent(getMinioMode()) + for n, l := range loggerCfg.HTTP { + if l.Enabled { + l.LogOnce = logger.LogOnceIf + l.UserAgent = userAgent + l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) + loggerCfg.HTTP[n] = l + } + } + err = logger.UpdateTargets(loggerCfg) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err)) + } + // Apply configurations. // We should not fail after this. var setDriveCounts []int diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index fd8f31570..7b720eb11 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -150,6 +150,10 @@ func (sys *HTTPConsoleLoggerSys) Content() (logs []log.Entry) { return } +// Cancel - cancels the target +func (sys *HTTPConsoleLoggerSys) Cancel() { +} + // Send log message 'e' to console and publish to console // log pubsub system func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error { diff --git a/cmd/data-update-tracker_test.go b/cmd/data-update-tracker_test.go index 70d05fd15..995c00453 100644 --- a/cmd/data-update-tracker_test.go +++ b/cmd/data-update-tracker_test.go @@ -55,6 +55,9 @@ func (t *testingLogger) Init() error { return nil } +func (t *testingLogger) Cancel() { +} + func (t *testingLogger) Send(entry interface{}, errKind string) error { t.mu.Lock() defer t.mu.Unlock() diff --git a/internal/config/config.go b/internal/config/config.go index d835533cf..6d4211151 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -166,6 +166,7 @@ var SubSystemsDynamic = set.CreateStringSet( ScannerSubSys, HealSubSys, SubnetSubSys, + LoggerWebhookSubSys, ) // SubSystemsSingleTargets - subsystems which only support single target. diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index e2d57898e..ab00a6f8b 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -25,6 +25,8 @@ import ( "fmt" "net/http" "strings" + "sync" + "sync/atomic" "time" xhttp "github.com/minio/minio/internal/http" @@ -55,6 +57,9 @@ type Config struct { // buffer is full, new logs are just ignored and an error // is returned to the caller. type Target struct { + status int32 + wg sync.WaitGroup + // Channel of log entries logCh chan interface{} @@ -109,6 +114,7 @@ func (h *Target) Init() error { h.config.Endpoint, resp.Status) } + h.status = 1 go h.startHTTPLogger() return nil } @@ -120,53 +126,60 @@ func acceptedResponseStatusCode(code int) bool { return acceptedStatusCodeMap[code] } +func (h *Target) logEntry(entry interface{}) { + h.wg.Add(1) + defer h.wg.Done() + + logJSON, err := json.Marshal(&entry) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + h.config.Endpoint, bytes.NewReader(logJSON)) + if err != nil { + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) + cancel() + return + } + req.Header.Set(xhttp.ContentType, "application/json") + + // Set user-agent to indicate MinIO release + // version to the configured log endpoint + req.Header.Set("User-Agent", h.config.UserAgent) + + if h.config.AuthToken != "" { + req.Header.Set("Authorization", h.config.AuthToken) + } + + client := http.Client{Transport: h.config.Transport} + resp, err := client.Do(req) + cancel() + if err != nil { + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) + return + } + + // Drain any response. + xhttp.DrainBody(resp.Body) + + if !acceptedResponseStatusCode(resp.StatusCode) { + switch resp.StatusCode { + case http.StatusForbidden: + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint) + default: + h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint) + } + } +} + func (h *Target) startHTTPLogger() { // Create a routine which sends json logs received // from an internal channel. go func() { for entry := range h.logCh { - logJSON, err := json.Marshal(&entry) - if err != nil { - continue - } - - ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, - h.config.Endpoint, bytes.NewReader(logJSON)) - if err != nil { - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) - cancel() - continue - } - req.Header.Set(xhttp.ContentType, "application/json") - - // Set user-agent to indicate MinIO release - // version to the configured log endpoint - req.Header.Set("User-Agent", h.config.UserAgent) - - if h.config.AuthToken != "" { - req.Header.Set("Authorization", h.config.AuthToken) - } - - client := http.Client{Transport: h.config.Transport} - resp, err := client.Do(req) - cancel() - if err != nil { - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) - continue - } - - // Drain any response. - xhttp.DrainBody(resp.Body) - - if !acceptedResponseStatusCode(resp.StatusCode) { - switch resp.StatusCode { - case http.StatusForbidden: - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint) - default: - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint) - } - } + h.logEntry(entry) } }() } @@ -184,6 +197,11 @@ func New(config Config) *Target { // Send log message 'e' to http target. func (h *Target) Send(entry interface{}, errKind string) error { + if atomic.LoadInt32(&h.status) == 0 { + // Channel was closed or used before init. + return nil + } + select { case h.logCh <- entry: default: @@ -194,3 +212,11 @@ func (h *Target) Send(entry interface{}, errKind string) error { return nil } + +// Cancel - cancels the target +func (h *Target) Cancel() { + if atomic.CompareAndSwapInt32(&h.status, 1, 0) { + close(h.logCh) + } + h.wg.Wait() +} diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 52d42da17..7a8db3a8c 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -197,6 +197,10 @@ func (h *Target) Init() error { return nil } +// Cancel - cancels the target +func (h *Target) Cancel() { +} + // New initializes a new logger target which // sends log over http to the specified endpoint func New(config Config) *Target { diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 1e9fc069e..035ffd1ec 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -20,6 +20,8 @@ package logger import ( "sync" "sync/atomic" + + "github.com/minio/minio/internal/logger/target/http" ) // Target is the entity that we will receive @@ -29,6 +31,7 @@ type Target interface { String() string Endpoint() string Init() error + Cancel() Send(entry interface{}, errKind string) error } @@ -108,3 +111,37 @@ func AddTarget(t Target) error { return nil } + +func cancelAllTargets() { + for _, tgt := range targets { + tgt.Cancel() + } +} + +func initTargets(cfg Config) (tgts []Target, err error) { + for _, l := range cfg.HTTP { + if l.Enabled { + t := http.New(l) + if err = t.Init(); err != nil { + return tgts, err + } + tgts = append(tgts, t) + } + } + return tgts, err +} + +// UpdateTargets swaps targets with newly loaded ones from the cfg +func UpdateTargets(cfg Config) error { + updated, err := initTargets(cfg) + if err != nil { + return err + } + + swapMu.Lock() + atomic.StoreInt32(&nTargets, int32(len(updated))) + cancelAllTargets() // cancel running targets + targets = updated + swapMu.Unlock() + return nil +}