diff --git a/cmd/config-current.go b/cmd/config-current.go index 83ca6d5c4..1be6725ad 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -386,7 +386,7 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o } default: if config.LoggerSubSystems.Contains(subSys) { - if err := logger.ValidateSubSysConfig(s, subSys); err != nil { + if err := logger.ValidateSubSysConfig(ctx, s, subSys); err != nil { return err } } @@ -575,7 +575,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf scannerCycle.Store(scannerCfg.Cycle) logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait)) case config.LoggerWebhookSubSys: - loggerCfg, err := logger.LookupConfigForSubSys(s, config.LoggerWebhookSubSys) + loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.LoggerWebhookSubSys) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err)) } @@ -592,7 +592,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs)) } case config.AuditWebhookSubSys: - loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys) + loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.AuditWebhookSubSys) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to load audit webhook config: %w", err)) } @@ -610,7 +610,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs)) } case config.AuditKafkaSubSys: - loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys) + loggerCfg, err := logger.LookupConfigForSubSys(ctx, s, config.AuditKafkaSubSys) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to load audit kafka config: %w", err)) } diff --git a/internal/logger/config.go b/internal/logger/config.go index 7eb5265f6..bc4580fb1 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -18,6 +18,7 @@ package logger import ( + "context" "crypto/tls" "errors" "strconv" @@ -98,6 +99,8 @@ const ( auditTargetNamePrefix = "audit-" ) +var errInvalidQueueSize = errors.New("invalid queue_size value") + // Default KVS for loggerHTTP and loggerAuditHTTP var ( DefaultLoggerWebhookKVS = config.KVS{ @@ -260,7 +263,7 @@ func getCfgVal(envName, key, defaultValue string) string { return env.Get(envName, defaultValue) } -func lookupLegacyConfigForSubSys(subSys string) Config { +func lookupLegacyConfigForSubSys(ctx context.Context, subSys string) Config { cfg := NewConfig() switch subSys { case config.LoggerWebhookSubSys: @@ -280,9 +283,14 @@ func lookupLegacyConfigForSubSys(subSys string) Config { if endpoint == "" { continue } + url, err := xnet.ParseHTTPURL(endpoint) + if err != nil { + LogOnceIf(ctx, err, "logger-webhook-"+endpoint) + continue + } cfg.HTTP[target] = http.Config{ Enabled: true, - Endpoint: endpoint, + Endpoint: url, } } @@ -303,9 +311,14 @@ func lookupLegacyConfigForSubSys(subSys string) Config { if endpoint == "" { continue } + url, err := xnet.ParseHTTPURL(endpoint) + if err != nil { + LogOnceIf(ctx, err, "audit-webhook-"+endpoint) + continue + } cfg.AuditWebhook[target] = http.Config{ Enabled: true, - Endpoint: endpoint, + Endpoint: url, } } @@ -373,7 +386,7 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { return cfg, err } if queueSize <= 0 { - return cfg, errors.New("invalid queue_size value") + return cfg, errInvalidQueueSize } kafkaArgs.QueueSize = queueSize @@ -384,214 +397,121 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { } func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { - envs := env.List(EnvLoggerWebhookEndpoint) - var loggerTargets []string - for _, k := range envs { - target := strings.TrimPrefix(k, EnvLoggerWebhookEndpoint+config.Default) - if target == EnvLoggerWebhookEndpoint { - target = config.Default - } - loggerTargets = append(loggerTargets, target) - } - - // Load HTTP logger from the environment if found - for _, target := range loggerTargets { - if v, ok := cfg.HTTP[target]; ok && v.Enabled { + for k, kv := range config.Merge(scfg[config.LoggerWebhookSubSys], EnvLoggerWebhookEnable, DefaultLoggerWebhookKVS) { + if v, ok := cfg.HTTP[k]; ok && v.Enabled { // This target is already enabled using the // legacy environment variables, ignore. continue } - - enableCfgVal := getCfgVal(EnvLoggerWebhookEnable, target, "") - enable, err := config.ParseBool(enableCfgVal) - if err != nil || !enable { - continue - } - - clientCert := getCfgVal(EnvLoggerWebhookClientCert, target, "") - clientKey := getCfgVal(EnvLoggerWebhookClientKey, target, "") - err = config.EnsureCertAndKey(clientCert, clientKey) - if err != nil { - return cfg, err - } - - queueSizeCfgVal := getCfgVal(EnvLoggerWebhookQueueSize, target, "100000") - queueSize, err := strconv.Atoi(queueSizeCfgVal) - if err != nil { - return cfg, err - } - if queueSize <= 0 { - return cfg, errors.New("invalid queue_size value") - } - - cfg.HTTP[target] = http.Config{ - Enabled: true, - Endpoint: getCfgVal(EnvLoggerWebhookEndpoint, target, ""), - AuthToken: getCfgVal(EnvLoggerWebhookAuthToken, target, ""), - ClientCert: clientCert, - ClientKey: clientKey, - Proxy: getCfgVal(EnvLoggerWebhookProxy, target, ""), - QueueSize: queueSize, - QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, target, ""), - Name: loggerTargetNamePrefix + target, - } - } - - for starget, kv := range scfg[config.LoggerWebhookSubSys] { - if l, ok := cfg.HTTP[starget]; ok && l.Enabled { - // Ignore this HTTP logger config since there is - // a target with the same name loaded and enabled - // from the environment. - continue - } subSysTarget := config.LoggerWebhookSubSys - if starget != config.Default { - subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget + if k != config.Default { + subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + k } if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil { return cfg, err } - enabled, err := config.ParseBool(kv.Get(config.Enable)) - if err != nil { - return cfg, err - } - if !enabled { - continue - } - err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) - if err != nil { - return cfg, err - } - queueSize, err := strconv.Atoi(kv.Get(QueueSize)) - if err != nil { - return cfg, err - } - if queueSize <= 0 { - return cfg, errors.New("invalid queue_size value") - } - cfg.HTTP[starget] = http.Config{ - Enabled: true, - Endpoint: kv.Get(Endpoint), - AuthToken: kv.Get(AuthToken), - ClientCert: kv.Get(ClientCert), - ClientKey: kv.Get(ClientKey), - Proxy: kv.Get(Proxy), - QueueSize: queueSize, - QueueDir: kv.Get(QueueDir), - Name: loggerTargetNamePrefix + starget, - } - } - - return cfg, nil -} - -func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { - var loggerAuditTargets []string - envs := env.List(EnvAuditWebhookEndpoint) - for _, k := range envs { - target := strings.TrimPrefix(k, EnvAuditWebhookEndpoint+config.Default) - if target == EnvAuditWebhookEndpoint { - target = config.Default - } - loggerAuditTargets = append(loggerAuditTargets, target) - } - - for _, target := range loggerAuditTargets { - if v, ok := cfg.AuditWebhook[target]; ok && v.Enabled { - // This target is already enabled using the - // legacy environment variables, ignore. - continue - } - enable, err := config.ParseBool(getCfgVal(EnvAuditWebhookEnable, target, "")) + enableCfgVal := getCfgVal(EnvLoggerWebhookEnable, k, kv.Get(config.Enable)) + enable, err := config.ParseBool(enableCfgVal) if err != nil || !enable { continue } - - clientCert := getCfgVal(EnvAuditWebhookClientCert, target, "") - clientKey := getCfgVal(EnvAuditWebhookClientKey, target, "") + var url *xnet.URL + endpoint := getCfgVal(EnvLoggerWebhookEndpoint, k, kv.Get(Endpoint)) + url, err = xnet.ParseHTTPURL(endpoint) + if err != nil { + return cfg, err + } + clientCert := getCfgVal(EnvLoggerWebhookClientCert, k, kv.Get(ClientCert)) + clientKey := getCfgVal(EnvLoggerWebhookClientKey, k, kv.Get(ClientKey)) err = config.EnsureCertAndKey(clientCert, clientKey) if err != nil { return cfg, err } - - queueSizeCfgVal := getCfgVal(EnvAuditWebhookQueueSize, target, "100000") + queueSizeCfgVal := getCfgVal(EnvLoggerWebhookQueueSize, k, kv.Get(QueueSize)) queueSize, err := strconv.Atoi(queueSizeCfgVal) if err != nil { return cfg, err } if queueSize <= 0 { - return cfg, errors.New("invalid queue_size value") + return cfg, errInvalidQueueSize } - - cfg.AuditWebhook[target] = http.Config{ + cfg.HTTP[k] = http.Config{ Enabled: true, - Endpoint: getCfgVal(EnvAuditWebhookEndpoint, target, ""), - AuthToken: getCfgVal(EnvAuditWebhookAuthToken, target, ""), + Endpoint: url, + AuthToken: getCfgVal(EnvLoggerWebhookAuthToken, k, kv.Get(AuthToken)), ClientCert: clientCert, ClientKey: clientKey, + Proxy: getCfgVal(EnvLoggerWebhookProxy, k, kv.Get(Proxy)), QueueSize: queueSize, - QueueDir: getCfgVal(EnvAuditWebhookQueueDir, target, ""), - Name: auditTargetNamePrefix + target, + QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)), + Name: loggerTargetNamePrefix + k, } } + return cfg, nil +} - for starget, kv := range scfg[config.AuditWebhookSubSys] { - if l, ok := cfg.AuditWebhook[starget]; ok && l.Enabled { - // Ignore this audit config since another target - // with the same name is already loaded and enabled - // in the shell environment. +func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { + for k, kv := range config.Merge(scfg[config.AuditWebhookSubSys], EnvAuditWebhookEnable, DefaultAuditWebhookKVS) { + if v, ok := cfg.AuditWebhook[k]; ok && v.Enabled { + // This target is already enabled using the + // legacy environment variables, ignore. continue } subSysTarget := config.AuditWebhookSubSys - if starget != config.Default { - subSysTarget = config.AuditWebhookSubSys + config.SubSystemSeparator + starget + if k != config.Default { + subSysTarget = config.AuditWebhookSubSys + config.SubSystemSeparator + k } if err := config.CheckValidKeys(subSysTarget, kv, DefaultAuditWebhookKVS); err != nil { return cfg, err } - enabled, err := config.ParseBool(kv.Get(config.Enable)) - if err != nil { - return cfg, err - } - if !enabled { + enable, err := config.ParseBool(getCfgVal(EnvAuditWebhookEnable, k, kv.Get(config.Enable))) + if err != nil || !enable { continue } - err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) + var url *xnet.URL + endpoint := getCfgVal(EnvAuditWebhookEndpoint, k, kv.Get(Endpoint)) + url, err = xnet.ParseHTTPURL(endpoint) if err != nil { return cfg, err } - queueSize, err := strconv.Atoi(kv.Get(QueueSize)) + clientCert := getCfgVal(EnvAuditWebhookClientCert, k, kv.Get(ClientCert)) + clientKey := getCfgVal(EnvAuditWebhookClientKey, k, kv.Get(ClientKey)) + err = config.EnsureCertAndKey(clientCert, clientKey) + if err != nil { + return cfg, err + } + queueSizeCfgVal := getCfgVal(EnvAuditWebhookQueueSize, k, kv.Get(QueueSize)) + queueSize, err := strconv.Atoi(queueSizeCfgVal) if err != nil { return cfg, err } if queueSize <= 0 { - return cfg, errors.New("invalid queue_size value") + return cfg, errInvalidQueueSize } - cfg.AuditWebhook[starget] = http.Config{ + cfg.AuditWebhook[k] = http.Config{ Enabled: true, - Endpoint: kv.Get(Endpoint), - AuthToken: kv.Get(AuthToken), - ClientCert: kv.Get(ClientCert), - ClientKey: kv.Get(ClientKey), + Endpoint: url, + AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)), + ClientCert: clientCert, + ClientKey: clientKey, QueueSize: queueSize, - QueueDir: kv.Get(QueueDir), - Name: auditTargetNamePrefix + starget, + QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)), + Name: auditTargetNamePrefix + k, } } - return cfg, nil } // LookupConfigForSubSys - lookup logger config, override with ENVs if set, for the given sub-system -func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err error) { +func LookupConfigForSubSys(ctx context.Context, scfg config.Config, subSys string) (cfg Config, err error) { switch subSys { case config.LoggerWebhookSubSys: - cfg = lookupLegacyConfigForSubSys(config.LoggerWebhookSubSys) + cfg = lookupLegacyConfigForSubSys(ctx, config.LoggerWebhookSubSys) if cfg, err = lookupLoggerWebhookConfig(scfg, cfg); err != nil { return cfg, err } case config.AuditWebhookSubSys: - cfg = lookupLegacyConfigForSubSys(config.AuditWebhookSubSys) + cfg = lookupLegacyConfigForSubSys(ctx, config.AuditWebhookSubSys) if cfg, err = lookupAuditWebhookConfig(scfg, cfg); err != nil { return cfg, err } @@ -605,8 +525,8 @@ func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err e } // ValidateSubSysConfig - validates logger related config of given sub-system -func ValidateSubSysConfig(scfg config.Config, subSys string) error { +func ValidateSubSysConfig(ctx context.Context, scfg config.Config, subSys string) error { // Lookup for legacy environment variables first - _, err := LookupConfigForSubSys(scfg, subSys) + _, err := LookupConfigForSubSys(ctx, scfg, subSys) return err } diff --git a/internal/logger/legacy.go b/internal/logger/legacy.go index e9b870c4a..131c4ffd5 100644 --- a/internal/logger/legacy.go +++ b/internal/logger/legacy.go @@ -41,7 +41,7 @@ func SetLoggerHTTPAudit(scfg config.Config, k string, args http.Config) { }, config.KV{ Key: Endpoint, - Value: args.Endpoint, + Value: args.Endpoint.String(), }, config.KV{ Key: AuthToken, @@ -64,7 +64,7 @@ func SetLoggerHTTP(scfg config.Config, k string, args http.Config) { }, config.KV{ Key: Endpoint, - Value: args.Endpoint, + Value: args.Endpoint.String(), }, config.KV{ Key: AuthToken, diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 4ff666a1f..5eff28142 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -41,7 +41,7 @@ import ( const ( // Timeout for the webhook http call - webhookCallTimeout = 5 * time.Second + webhookCallTimeout = 3 * time.Second // maxWorkers is the maximum number of concurrent http loggers maxWorkers = 16 @@ -61,7 +61,7 @@ type Config struct { Enabled bool `json:"enabled"` Name string `json:"name"` UserAgent string `json:"userAgent"` - Endpoint string `json:"endpoint"` + Endpoint *xnet.URL `json:"endpoint"` AuthToken string `json:"authToken"` ClientCert string `json:"clientCert"` ClientKey string `json:"clientKey"` @@ -119,7 +119,7 @@ func (h *Target) Name() string { // Endpoint returns the backend endpoint func (h *Target) Endpoint() string { - return h.config.Endpoint + return h.config.Endpoint.String() } func (h *Target) String() string { @@ -128,8 +128,8 @@ func (h *Target) String() string { // IsOnline returns true if the target is reachable. func (h *Target) IsOnline(ctx context.Context) bool { - if err := h.checkAlive(ctx); err != nil { - return !xnet.IsNetworkOrHostDown(err, false) + if err := h.send(ctx, []byte(`{}`), webhookCallTimeout); err != nil { + return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) } return true } @@ -148,11 +148,6 @@ func (h *Target) Stats() types.TargetStats { return stats } -// This will check if we can reach the remote. -func (h *Target) checkAlive(ctx context.Context) (err error) { - return h.send(ctx, []byte(`{}`), webhookCallTimeout) -} - // Init validate and initialize the http target func (h *Target) Init(ctx context.Context) (err error) { if h.config.QueueDir != "" { @@ -224,9 +219,9 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, - h.config.Endpoint, bytes.NewReader(payload)) + h.Endpoint(), bytes.NewReader(payload)) if err != nil { - return fmt.Errorf("invalid configuration for '%s'; %v", h.config.Endpoint, err) + return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err) } req.Header.Set(xhttp.ContentType, "application/json") req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) @@ -242,7 +237,7 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration resp, err := h.client.Do(req) if err != nil { - return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err) + return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.Endpoint(), err) } // Drain any response. @@ -253,9 +248,9 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration // accepted HTTP status codes. return nil case http.StatusForbidden: - return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status) + return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.Endpoint(), resp.Status) default: - return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status) + return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.Endpoint(), resp.Status) } } @@ -282,7 +277,7 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) { } tries++ if err := h.send(ctx, logJSON, webhookCallTimeout); err != nil { - h.config.LogOnce(ctx, err, h.config.Endpoint) + h.config.LogOnce(ctx, err, h.Endpoint()) atomic.AddInt64(&h.failedMessages, 1) } else { return