diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 79264d87c..50f9a2f09 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1697,12 +1697,13 @@ type ReplicationPool struct { activeWorkers int32 activeMRFWorkers int32 - objLayer ObjectLayer - ctx context.Context - priority string - mu sync.RWMutex - mrfMU sync.Mutex - resyncer *replicationResyncer + objLayer ObjectLayer + ctx context.Context + priority string + maxWorkers int + mu sync.RWMutex + mrfMU sync.Mutex + resyncer *replicationResyncer // workers: workers []chan ReplicationWorkerOperation @@ -1748,9 +1749,13 @@ const ( func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { var workers, failedWorkers int priority := "auto" + maxWorkers := WorkerMaxLimit if opts.Priority != "" { priority = opts.Priority } + if opts.MaxWorkers > 0 { + maxWorkers = opts.MaxWorkers + } switch priority { case "fast": workers = WorkerMaxLimit @@ -1762,7 +1767,13 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool workers = WorkerAutoDefault failedWorkers = MRFWorkerAutoDefault } + if maxWorkers > 0 && workers > maxWorkers { + workers = maxWorkers + } + if maxWorkers > 0 && failedWorkers > maxWorkers { + failedWorkers = maxWorkers + } pool := &ReplicationPool{ workers: make([]chan ReplicationWorkerOperation, 0, workers), lrgworkers: make([]chan ReplicationWorkerOperation, 0, LargeWorkerCount), @@ -1774,6 +1785,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool ctx: ctx, objLayer: o, priority: priority, + maxWorkers: maxWorkers, } pool.AddLargeWorkers() @@ -1929,7 +1941,7 @@ func (p *ReplicationPool) ResizeWorkers(n, checkOld int) { } // ResizeWorkerPriority sets replication failed workers pool size -func (p *ReplicationPool) ResizeWorkerPriority(pri string) { +func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers int) { var workers, mrfWorkers int p.mu.Lock() switch pri { @@ -1949,7 +1961,15 @@ func (p *ReplicationPool) ResizeWorkerPriority(pri string) { mrfWorkers = int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerAutoDefault)) } } + if maxWorkers > 0 && workers > maxWorkers { + workers = maxWorkers + } + + if maxWorkers > 0 && mrfWorkers > maxWorkers { + mrfWorkers = maxWorkers + } p.priority = pri + p.maxWorkers = maxWorkers p.mu.Unlock() p.ResizeWorkers(workers, 0) p.ResizeFailedWorkers(mrfWorkers) @@ -2023,6 +2043,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) p.mu.RLock() prio := p.priority + maxWorkers := p.maxWorkers p.mu.RUnlock() switch prio { case "fast": @@ -2030,16 +2051,18 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case "slow": logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) default: - if p.ActiveWorkers() < WorkerMaxLimit { + maxWorkers = int(math.Min(float64(maxWorkers), WorkerMaxLimit)) + if p.ActiveWorkers() < maxWorkers { p.mu.RLock() - workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit)) + workers := int(math.Min(float64(len(p.workers)+1), float64(maxWorkers))) existing := len(p.workers) p.mu.RUnlock() p.ResizeWorkers(workers, existing) } - if p.ActiveMRFWorkers() < MRFWorkerMaxLimit { + maxMRFWorkers := int(math.Min(float64(maxWorkers), MRFWorkerMaxLimit)) + if p.ActiveMRFWorkers() < maxMRFWorkers { p.mu.RLock() - workers := int(math.Min(float64(p.mrfWorkerSize+1), MRFWorkerMaxLimit)) + workers := int(math.Min(float64(p.mrfWorkerSize+1), float64(maxMRFWorkers))) p.mu.RUnlock() p.ResizeFailedWorkers(workers) } @@ -2077,6 +2100,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) p.mu.RLock() prio := p.priority + maxWorkers := p.maxWorkers p.mu.RUnlock() switch prio { case "fast": @@ -2084,9 +2108,10 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf case "slow": logger.LogOnceIf(GlobalContext, fmt.Errorf("WARNING: Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem)) default: - if p.ActiveWorkers() < WorkerMaxLimit { + maxWorkers = int(math.Min(float64(maxWorkers), WorkerMaxLimit)) + if p.ActiveWorkers() < maxWorkers { p.mu.RLock() - workers := int(math.Min(float64(len(p.workers)+1), WorkerMaxLimit)) + workers := int(math.Min(float64(len(p.workers)+1), float64(maxWorkers))) existing := len(p.workers) p.mu.RUnlock() p.ResizeWorkers(workers, existing) @@ -2096,13 +2121,12 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf } type replicationPoolOpts struct { - Priority string + Priority string + MaxWorkers int } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - globalReplicationPool = NewReplicationPool(ctx, objectAPI, replicationPoolOpts{ - Priority: globalAPIConfig.getReplicationPriority(), - }) + globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts()) globalReplicationStats = NewReplicationStats(ctx, objectAPI) go globalReplicationStats.trackEWMA() } diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 8505c234e..3f592ffa1 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -42,9 +42,10 @@ type apiConfig struct { listQuorum string corsAllowOrigins []string // total drives per erasure set across pools. - totalDriveCount int - replicationPriority string - transitionWorkers int + totalDriveCount int + replicationPriority string + replicationMaxWorkers int + transitionWorkers int staleUploadsExpiry time.Duration staleUploadsCleanupInterval time.Duration @@ -152,10 +153,11 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { } t.listQuorum = listQuorum if globalReplicationPool != nil && - cfg.ReplicationPriority != t.replicationPriority { - globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority) + (cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers) { + globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers) } t.replicationPriority = cfg.ReplicationPriority + t.replicationMaxWorkers = cfg.ReplicationMaxWorkers if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers { globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) } @@ -334,15 +336,21 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc { } } -func (t *apiConfig) getReplicationPriority() string { +func (t *apiConfig) getReplicationOpts() replicationPoolOpts { t.mu.RLock() defer t.mu.RUnlock() if t.replicationPriority == "" { - return "auto" + return replicationPoolOpts{ + Priority: "auto", + MaxWorkers: WorkerMaxLimit, + } } - return t.replicationPriority + return replicationPoolOpts{ + Priority: t.replicationPriority, + MaxWorkers: t.replicationMaxWorkers, + } } func (t *apiConfig) getTransitionWorkers() int { diff --git a/internal/config/api/api.go b/internal/config/api/api.go index 5cee446a0..03776c3d1 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -31,13 +31,15 @@ import ( // API sub-system constants const ( - apiRequestsMax = "requests_max" - apiRequestsDeadline = "requests_deadline" - apiClusterDeadline = "cluster_deadline" - apiCorsAllowOrigin = "cors_allow_origin" - apiRemoteTransportDeadline = "remote_transport_deadline" - apiListQuorum = "list_quorum" - apiReplicationPriority = "replication_priority" + apiRequestsMax = "requests_max" + apiRequestsDeadline = "requests_deadline" + apiClusterDeadline = "cluster_deadline" + apiCorsAllowOrigin = "cors_allow_origin" + apiRemoteTransportDeadline = "remote_transport_deadline" + apiListQuorum = "list_quorum" + apiReplicationPriority = "replication_priority" + apiReplicationMaxWorkers = "replication_max_workers" + apiTransitionWorkers = "transition_workers" apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" apiStaleUploadsExpiry = "stale_uploads_expiry" @@ -48,16 +50,16 @@ const ( apiRootAccess = "root_access" apiSyncEvents = "sync_events" - EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" - EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" - EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" - EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" - EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" - EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" - EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" - EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn - EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" - + EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" + EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" + EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" + EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" + EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" + EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" + EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" + EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn + EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" + EnvAPIReplicationMaxWorkers = "MINIO_API_REPLICATION_MAX_WORKERS" EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL" EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY" EnvAPIDeleteCleanupInterval = "MINIO_API_DELETE_CLEANUP_INTERVAL" @@ -107,6 +109,10 @@ var ( Key: apiReplicationPriority, Value: "auto", }, + config.KV{ + Key: apiReplicationMaxWorkers, + Value: "500", + }, config.KV{ Key: apiTransitionWorkers, Value: "100", @@ -156,6 +162,7 @@ type Config struct { RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` ListQuorum string `json:"list_quorum"` ReplicationPriority string `json:"replication_priority"` + ReplicationMaxWorkers int `json:"replication_max_workers"` TransitionWorkers int `json:"transition_workers"` StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` @@ -259,7 +266,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { return cfg, fmt.Errorf("invalid value %v for replication_priority", replicationPriority) } cfg.ReplicationPriority = replicationPriority + replicationMaxWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationMaxWorkers, kvs.GetWithDefault(apiReplicationMaxWorkers, DefaultKVS))) + if err != nil { + return cfg, err + } + if replicationMaxWorkers <= 0 || replicationMaxWorkers > 500 { + return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Number of replication workers should be between 1 and 500") + } + cfg.ReplicationMaxWorkers = replicationMaxWorkers transitionWorkers, err := strconv.Atoi(env.Get(EnvAPITransitionWorkers, kvs.GetWithDefault(apiTransitionWorkers, DefaultKVS))) if err != nil { return cfg, err diff --git a/internal/config/api/help.go b/internal/config/api/help.go index d87fdef2c..44840bb1a 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -68,6 +68,12 @@ var ( Optional: true, Type: "string", }, + config.HelpKV{ + Key: apiReplicationMaxWorkers, + Description: `set the maximum number of replication workers` + defaultHelpPostfix(apiReplicationMaxWorkers), + Optional: true, + Type: "number", + }, config.HelpKV{ Key: apiTransitionWorkers, Description: `set the number of transition workers` + defaultHelpPostfix(apiTransitionWorkers),