From f1302c40fe0fe9effa6f4e8208cc8a420fe3acfa Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 15 Aug 2024 05:04:40 -0700 Subject: [PATCH] Fix uninitialized replication stats (#20260) Services are unfrozen before `initBackgroundReplication` is finished. This means that the globalReplicationStats write is racy. Switch to an atomic pointer. Provide the `ReplicationPool` with the stats, so it doesn't have to be grabbed from the atomic pointer on every use. All other loads and checks are nil, and calls return empty values when stats still haven't been initialized. --- cmd/bucket-handlers.go | 2 +- cmd/bucket-replication-handlers.go | 6 +- cmd/bucket-replication-metrics.go | 2 +- cmd/bucket-replication-stats.go | 16 ++++- cmd/bucket-replication.go | 90 +++++++++++++++------------- cmd/bucket-stats.go | 4 +- cmd/handler-api.go | 4 +- cmd/metrics-v2.go | 6 +- cmd/metrics-v3-bucket-replication.go | 2 +- cmd/metrics-v3-replication.go | 5 +- cmd/metrics.go | 2 +- cmd/notification.go | 22 +++---- cmd/object-handlers.go | 22 +++---- cmd/object-multipart-handlers.go | 2 +- cmd/peer-rest-server.go | 27 +++++---- cmd/server-main.go | 2 +- cmd/site-replication.go | 12 ++-- cmd/test-utils_test.go | 4 +- internal/once/singleton.go | 46 ++++++++++++++ 19 files changed, 175 insertions(+), 101 deletions(-) create mode 100644 internal/once/singleton.go diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 81b58ead6..13bed4ab8 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1682,7 +1682,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } globalNotificationSys.DeleteBucketMetadata(ctx, bucket) - globalReplicationPool.deleteResyncMetadata(ctx, bucket) + globalReplicationPool.Get().deleteResyncMetadata(ctx, bucket) // Call site replication hook. replLogIf(ctx, globalSiteReplicationSys.DeleteBucketHook(ctx, bucket, forceDelete)) diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index f593f7666..05a9d2e85 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -230,7 +230,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW w.Header().Set(xhttp.ContentType, string(mimeJSON)) enc := json.NewEncoder(w) - stats := globalReplicationStats.getLatestReplicationStats(bucket) + stats := globalReplicationStats.Load().getLatestReplicationStats(bucket) bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket) bwMap := bwRpt.BucketStats for arn, st := range stats.ReplicationStats.Stats { @@ -286,7 +286,7 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsV2Handler(w http.Respons w.Header().Set(xhttp.ContentType, string(mimeJSON)) enc := json.NewEncoder(w) - stats := globalReplicationStats.getLatestReplicationStats(bucket) + stats := globalReplicationStats.Load().getLatestReplicationStats(bucket) bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket) bwMap := bwRpt.BucketStats for arn, st := range stats.ReplicationStats.Stats { @@ -422,7 +422,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseW return } - if err := globalReplicationPool.resyncer.start(ctx, objectAPI, resyncOpts{ + if err := globalReplicationPool.Get().resyncer.start(ctx, objectAPI, resyncOpts{ bucket: bucket, arn: arn, resyncID: resetID, diff --git a/cmd/bucket-replication-metrics.go b/cmd/bucket-replication-metrics.go index 3b3b56af6..aa4cfb963 100644 --- a/cmd/bucket-replication-metrics.go +++ b/cmd/bucket-replication-metrics.go @@ -119,7 +119,7 @@ func (a *ActiveWorkerStat) update() { if a == nil { return } - a.Curr = globalReplicationPool.ActiveWorkers() + a.Curr = globalReplicationPool.Get().ActiveWorkers() a.hist.Update(int64(a.Curr)) a.Avg = float32(a.hist.Mean()) a.Max = int(a.hist.Max()) diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 1df4a6827..d20a0ff47 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -87,6 +87,9 @@ func (r *ReplicationStats) updateMovingAvg() { // ActiveWorkers returns worker stats func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat { + if r == nil { + return ActiveWorkerStat{} + } r.wlock.RLock() defer r.wlock.RUnlock() w := r.workers.get() @@ -351,6 +354,9 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio } func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) { + if r == nil { + return nil + } peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext) bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage)) @@ -460,6 +466,9 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket // get the most current of in-memory replication stats and data usage info from crawler. func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) { + if r == nil { + return s + } bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) return r.calculateBucketReplicationStats(bucket, bucketStats) } @@ -495,9 +504,14 @@ func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opTyp // incProxy increments proxy metrics for proxied calls func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) { - r.pCache.inc(bucket, api, isErr) + if r != nil { + r.pCache.inc(bucket, api, isErr) + } } func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric { + if r == nil { + return ProxyMetric{} + } return r.pCache.getBucketStats(bucket) } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b7c63b044..5902c3ee6 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -50,6 +50,7 @@ import ( xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/tinylib/msgp/msgp" "github.com/zeebo/xxh3" "golang.org/x/exp/maps" @@ -478,7 +479,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { - globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry()) sendEvent(eventArgs{ BucketName: bucket, Object: ObjectInfo{ @@ -548,7 +549,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj // to decrement pending count later. for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { - globalReplicationStats.Update(dobj.Bucket, rinfo, replicationStatus, + globalReplicationStats.Load().Update(dobj.Bucket, rinfo, replicationStatus, prevStatus) } } @@ -556,7 +557,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj eventName := event.ObjectReplicationComplete if replicationStatus == replication.Failed { eventName = event.ObjectReplicationFailed - globalReplicationPool.queueMRFSave(dobj.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry()) } drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID) if replicationStatus != prevStatus { @@ -1054,7 +1055,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje UserAgent: "Internal: [Replication]", Host: globalLocalNodeName, }) - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry()) return } ctx = lkctx.Context() @@ -1139,7 +1140,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje for _, rinfo := range rinfos.Targets { if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus { rinfo.OpType = opType // update optype to reflect correct operation. - globalReplicationStats.Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus) + globalReplicationStats.Load().Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus) } } } @@ -1159,7 +1160,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje ri.EventType = ReplicateMRF ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal() ri.RetryCount++ - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry()) } } @@ -1787,8 +1788,8 @@ const ( ) var ( - globalReplicationPool *ReplicationPool - globalReplicationStats *ReplicationStats + globalReplicationPool = once.NewSingleton[ReplicationPool]() + globalReplicationStats atomic.Pointer[ReplicationStats] ) // ReplicationPool describes replication pool @@ -1803,6 +1804,7 @@ type ReplicationPool struct { priority string maxWorkers int maxLWorkers int + stats *ReplicationStats mu sync.RWMutex mrfMU sync.Mutex @@ -1849,7 +1851,7 @@ const ( ) // NewReplicationPool creates a pool of replication workers of specified size -func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { +func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts, stats *ReplicationStats) *ReplicationPool { var workers, failedWorkers int priority := "auto" maxWorkers := WorkerMaxLimit @@ -1891,6 +1893,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool mrfStopCh: make(chan struct{}, 1), ctx: ctx, objLayer: o, + stats: stats, priority: priority, maxWorkers: maxWorkers, maxLWorkers: maxLWorkers, @@ -1918,11 +1921,11 @@ func (p *ReplicationPool) AddMRFWorker() { } switch v := oi.(type) { case ReplicateObjectInfo: - globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) atomic.AddInt32(&p.activeMRFWorkers, 1) replicateObject(p.ctx, v, p.objLayer) atomic.AddInt32(&p.activeMRFWorkers, -1) - globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) default: bugLogIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type") @@ -1950,9 +1953,9 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT if opTracker != nil { atomic.AddInt32(opTracker, 1) } - globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) replicateObject(p.ctx, v, p.objLayer) - globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) if opTracker != nil { atomic.AddInt32(opTracker, -1) } @@ -1960,10 +1963,10 @@ func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opT if opTracker != nil { atomic.AddInt32(opTracker, 1) } - globalReplicationStats.incQ(v.Bucket, 0, true, v.OpType) + p.stats.incQ(v.Bucket, 0, true, v.OpType) replicateDelete(p.ctx, v, p.objLayer) - globalReplicationStats.decQ(v.Bucket, 0, true, v.OpType) + p.stats.decQ(v.Bucket, 0, true, v.OpType) if opTracker != nil { atomic.AddInt32(opTracker, -1) @@ -1990,9 +1993,9 @@ func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation if opTracker != nil { atomic.AddInt32(opTracker, 1) } - globalReplicationStats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) replicateObject(p.ctx, v, p.objLayer) - globalReplicationStats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) + p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType) if opTracker != nil { atomic.AddInt32(opTracker, -1) } @@ -2156,7 +2159,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case <-p.ctx.Done(): case p.lrgworkers[h%uint64(len(p.lrgworkers))] <- ri: default: - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + p.queueMRFSave(ri.ToMRFEntry()) p.mu.RLock() maxLWorkers := p.maxLWorkers existing := len(p.lrgworkers) @@ -2187,7 +2190,7 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { case healCh <- ri: case ch <- ri: default: - globalReplicationPool.queueMRFSave(ri.ToMRFEntry()) + globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry()) p.mu.RLock() prio := p.priority maxWorkers := p.maxWorkers @@ -2223,7 +2226,7 @@ func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObje doi.ResetID = v.ResetID doi.TargetArn = k - globalReplicationPool.queueReplicaDeleteTask(doi) + globalReplicationPool.Get().queueReplicaDeleteTask(doi) } } } @@ -2244,7 +2247,7 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf case <-p.ctx.Done(): case ch <- doi: default: - globalReplicationPool.queueMRFSave(doi.ToMRFEntry()) + p.queueMRFSave(doi.ToMRFEntry()) p.mu.RLock() prio := p.priority maxWorkers := p.maxWorkers @@ -2274,9 +2277,10 @@ type replicationPoolOpts struct { } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts()) - globalReplicationStats = NewReplicationStats(ctx, objectAPI) - go globalReplicationStats.trackEWMA() + stats := NewReplicationStats(ctx, objectAPI) + globalReplicationPool.Set(NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts(), stats)) + globalReplicationStats.Store(stats) + go stats.trackEWMA() } type proxyResult struct { @@ -2482,7 +2486,7 @@ func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc if dsc.Synchronous() { replicateObject(ctx, ri, o) } else { - globalReplicationPool.queueReplicaTask(ri) + globalReplicationPool.Get().queueReplicaTask(ri) } } @@ -2610,9 +2614,9 @@ func proxyGetTaggingToRepTarget(ctx context.Context, bucket, object string, opts } func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) { - globalReplicationPool.queueReplicaDeleteTask(dv) + globalReplicationPool.Get().queueReplicaDeleteTask(dv) for arn := range dv.ReplicationState.Targets { - globalReplicationStats.Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType("")) + globalReplicationStats.Load().Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType("")) } } @@ -3042,9 +3046,9 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt if len(tgtArns) == 0 { return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn) } - globalReplicationPool.resyncer.RLock() - data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket] - globalReplicationPool.resyncer.RUnlock() + globalReplicationPool.Get().resyncer.RLock() + data, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket] + globalReplicationPool.Get().resyncer.RUnlock() if !ok { data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI) if err != nil { @@ -3070,9 +3074,9 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt return err } - globalReplicationPool.resyncer.Lock() - defer globalReplicationPool.resyncer.Unlock() - brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket] + globalReplicationPool.Get().resyncer.Lock() + defer globalReplicationPool.Get().resyncer.Unlock() + brs, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket] if !ok { brs = BucketReplicationResyncStatus{ Version: resyncMetaVersion, @@ -3080,8 +3084,8 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt } } brs.TargetsMap[opts.arn] = status - globalReplicationPool.resyncer.statusMap[opts.bucket] = brs - go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts) + globalReplicationPool.Get().resyncer.statusMap[opts.bucket] = brs + go globalReplicationPool.Get().resyncer.resyncBucket(GlobalContext, objAPI, false, opts) return nil } @@ -3413,7 +3417,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf if roi.ReplicationStatus == replication.Pending || roi.ReplicationStatus == replication.Failed || roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending { - globalReplicationPool.queueReplicaDeleteTask(dv) + globalReplicationPool.Get().queueReplicaDeleteTask(dv) return } // if replication status is Complete on DeleteMarker and existing object resync required @@ -3429,12 +3433,12 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf switch roi.ReplicationStatus { case replication.Pending, replication.Failed: roi.EventType = ReplicateHeal - globalReplicationPool.queueReplicaTask(roi) + globalReplicationPool.Get().queueReplicaTask(roi) return } if roi.ExistingObjResync.mustResync() { roi.EventType = ReplicateExisting - globalReplicationPool.queueReplicaTask(roi) + globalReplicationPool.Get().queueReplicaTask(roi) } return } @@ -3499,8 +3503,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { return } if entry.RetryCount > mrfRetryLimit { // let scanner catch up if retry count exceeded - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1) - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) return } @@ -3513,8 +3517,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { select { case p.mrfSaveCh <- entry: default: - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1) - atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1) + atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) } } } @@ -3563,7 +3567,7 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string if !p.initialized() { return } - atomic.StoreUint64(&globalReplicationStats.mrfStats.LastFailedCount, uint64(len(entries))) + atomic.StoreUint64(&p.stats.mrfStats.LastFailedCount, uint64(len(entries))) if len(entries) == 0 { return } diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index a51fbf41d..82f8e885a 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -310,7 +310,7 @@ type ReplQNodeStats struct { func (r *ReplicationStats) getNodeQueueStats(bucket string) (qs ReplQNodeStats) { qs.NodeName = globalLocalNodeName qs.Uptime = UTCNow().Unix() - globalBootTime.Unix() - qs.ActiveWorkers = globalReplicationStats.ActiveWorkers() + qs.ActiveWorkers = globalReplicationStats.Load().ActiveWorkers() qs.XferStats = make(map[RMetricName]XferStats) qs.QStats = r.qCache.getBucketStats(bucket) qs.TgtXferStats = make(map[string]map[RMetricName]XferStats) @@ -402,7 +402,7 @@ func (r *ReplicationStats) getNodeQueueStats(bucket string) (qs ReplQNodeStats) func (r *ReplicationStats) getNodeQueueStatsSummary() (qs ReplQNodeStats) { qs.NodeName = globalLocalNodeName qs.Uptime = UTCNow().Unix() - globalBootTime.Unix() - qs.ActiveWorkers = globalReplicationStats.ActiveWorkers() + qs.ActiveWorkers = globalReplicationStats.Load().ActiveWorkers() qs.XferStats = make(map[RMetricName]XferStats) qs.QStats = r.qCache.getSiteStats() qs.MRFStats = ReplicationMRFStats{ diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 11b67411b..8f6eb3172 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -170,9 +170,9 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int, legacy bool) { listQuorum = "strict" } t.listQuorum = listQuorum - if globalReplicationPool != nil && + if r := globalReplicationPool.GetNonBlocking(); r != nil && (cfg.ReplicationPriority != t.replicationPriority || cfg.ReplicationMaxWorkers != t.replicationMaxWorkers || cfg.ReplicationMaxLWorkers != t.replicationMaxLWorkers) { - globalReplicationPool.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers) + r.ResizeWorkerPriority(cfg.ReplicationPriority, cfg.ReplicationMaxWorkers, cfg.ReplicationMaxLWorkers) } t.replicationPriority = cfg.ReplicationPriority t.replicationMaxWorkers = cfg.ReplicationMaxWorkers diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 98c0161e8..3dd3ac192 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -2313,8 +2313,8 @@ func getReplicationNodeMetrics(opts MetricsGroupOpts) *MetricsGroupV2 { var ml []MetricV2 // common operational metrics for bucket replication and site replication - published // at cluster level - if globalReplicationStats != nil { - qs := globalReplicationStats.getNodeQueueStatsSummary() + if rStats := globalReplicationStats.Load(); rStats != nil { + qs := rStats.getNodeQueueStatsSummary() activeWorkersCount := MetricV2{ Description: getClusterReplActiveWorkersCountMD(), } @@ -3245,7 +3245,7 @@ func getBucketUsageMetrics(opts MetricsGroupOpts) *MetricsGroupV2 { var bucketReplStats map[string]BucketStats if !globalSiteReplicationSys.isEnabled() { - bucketReplStats = globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage) + bucketReplStats = globalReplicationStats.Load().getAllLatest(dataUsageInfo.BucketsUsage) } for bucket, usage := range dataUsageInfo.BucketsUsage { quota, _ := globalBucketQuotaSys.Get(ctx, bucket) diff --git a/cmd/metrics-v3-bucket-replication.go b/cmd/metrics-v3-bucket-replication.go index 64f65e832..ef341801a 100644 --- a/cmd/metrics-v3-bucket-replication.go +++ b/cmd/metrics-v3-bucket-replication.go @@ -119,7 +119,7 @@ func loadBucketReplicationMetrics(ctx context.Context, m MetricValues, c *metric return nil } - bucketReplStats := globalReplicationStats.getAllLatest(dataUsageInfo.BucketsUsage) + bucketReplStats := globalReplicationStats.Load().getAllLatest(dataUsageInfo.BucketsUsage) for _, bucket := range buckets { labels := []string{bucketL, bucket} if s, ok := bucketReplStats[bucket]; ok { diff --git a/cmd/metrics-v3-replication.go b/cmd/metrics-v3-replication.go index da26e0956..44a8e87ae 100644 --- a/cmd/metrics-v3-replication.go +++ b/cmd/metrics-v3-replication.go @@ -69,11 +69,12 @@ var ( // loadClusterReplicationMetrics - `MetricsLoaderFn` for cluster replication metrics // such as transfer rate and objects queued. func loadClusterReplicationMetrics(ctx context.Context, m MetricValues, c *metricsCache) error { - if globalReplicationStats == nil { + st := globalReplicationStats.Load() + if st == nil { return nil } - qs := globalReplicationStats.getNodeQueueStatsSummary() + qs := st.getNodeQueueStatsSummary() qt := qs.QStats m.Set(replicationAverageQueuedBytes, float64(qt.Avg.Bytes)) diff --git a/cmd/metrics.go b/cmd/metrics.go index 42ae31a7b..0548ff2b0 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -300,7 +300,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { } for bucket, usageInfo := range dataUsageInfo.BucketsUsage { - stat := globalReplicationStats.getLatestReplicationStats(bucket) + stat := globalReplicationStats.Load().getLatestReplicationStats(bucket) // Total space used by bucket ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( diff --git a/cmd/notification.go b/cmd/notification.go index e37577566..5d65a8130 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -537,7 +537,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s // DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) { - globalReplicationStats.Delete(bucketName) + globalReplicationStats.Load().Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) globalBucketTargetSys.Delete(bucketName) globalEventNotifier.RemoveNotification(bucketName) @@ -591,7 +591,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck } } - replicationStatsList := globalReplicationStats.GetAll() + replicationStatsList := globalReplicationStats.Load().GetAll() bucketStatsMap := BucketStatsMap{ Stats: make(map[string]BucketStats, len(replicationStatsList)), Timestamp: UTCNow(), @@ -599,7 +599,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck for k, replicationStats := range replicationStatsList { bucketStatsMap.Stats[k] = BucketStats{ ReplicationStats: replicationStats, - ProxyStats: globalReplicationStats.getProxyStats(k), + ProxyStats: globalReplicationStats.Load().getProxyStats(k), } } @@ -632,11 +632,13 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String()) } } - bucketStats = append(bucketStats, BucketStats{ - ReplicationStats: globalReplicationStats.Get(bucketName), - QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}}, - ProxyStats: globalReplicationStats.getProxyStats(bucketName), - }) + if st := globalReplicationStats.Load(); st != nil { + bucketStats = append(bucketStats, BucketStats{ + ReplicationStats: st.Get(bucketName), + QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{st.getNodeQueueStats(bucketName)}}, + ProxyStats: st.getProxyStats(bucketName), + }) + } return bucketStats } @@ -665,7 +667,7 @@ func (sys *NotificationSys) GetClusterSiteMetrics(ctx context.Context) []SRMetri peersLogOnceIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err, nErr.Host.String()) } } - siteStats = append(siteStats, globalReplicationStats.getSRMetricsForNode()) + siteStats = append(siteStats, globalReplicationStats.Load().getSRMetricsForNode()) return siteStats } @@ -1605,7 +1607,7 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node if node != "all" && node != globalLocalNodeName { return nil } - mCh, err := globalReplicationPool.getMRF(ctx, bucket) + mCh, err := globalReplicationPool.Get().getMRF(ctx, bucket) if err != nil { return err } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 4ff2dcc0a..f1bf1f5b7 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -505,11 +505,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj if (isErrObjectNotFound(err) || isErrVersionNotFound(err) || isErrReadQuorum(err)) && !(gr != nil && gr.ObjInfo.DeleteMarker) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, getObjectAPI, false) + globalReplicationStats.Load().incProxy(bucket, getObjectAPI, false) // proxy to replication target if active-active replication is in place. reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts) if perr != nil { - globalReplicationStats.incProxy(bucket, getObjectAPI, true) + globalReplicationStats.Load().incProxy(bucket, getObjectAPI, true) proxyGetErr := ErrorRespToObjectError(perr, bucket, object) if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) && !isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) { @@ -1025,14 +1025,14 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob // proxy HEAD to replication target if active-active replication configured on bucket proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, headObjectAPI, false) + globalReplicationStats.Load().incProxy(bucket, headObjectAPI, false) var oi ObjectInfo oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts) if proxy.Proxy { objInfo = oi } if proxy.Err != nil { - globalReplicationStats.incProxy(bucket, headObjectAPI, true) + globalReplicationStats.Load().incProxy(bucket, headObjectAPI, true) writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err)) return } @@ -2090,7 +2090,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String() metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano) - defer globalReplicationStats.UpdateReplicaStat(bucket, size) + defer globalReplicationStats.Load().UpdateReplicaStat(bucket, size) } // Check if bucket encryption is enabled @@ -3301,11 +3301,11 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, false) + globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, false) // proxy to replication target if site replication is in place. tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts) if gerr.Err != nil || tags == nil { - globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, true) + globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL) return } // overlay tags from peer site. @@ -3404,11 +3404,11 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, false) + globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, false) // proxy to replication target if site replication is in place. perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts) if perr.Err != nil { - globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, true) + globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) return } @@ -3501,11 +3501,11 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r if isErrObjectNotFound(err) || isErrVersionNotFound(err) { proxytgts := getProxyTargets(ctx, bucket, object, opts) if !proxytgts.Empty() { - globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, false) + globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, false) // proxy to replication target if active-active replication is in place. perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts) if perr.Err != nil { - globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, true) + globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, true) writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL) return } diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index fe267b6ee..512dcec88 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -1029,7 +1029,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok { actualSize, _ := objInfo.GetActualSize() - defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize) + defer globalReplicationStats.Load().UpdateReplicaStat(bucket, actualSize) } // Get object location. diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index ef18d0aea..3a383a156 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -469,7 +469,7 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoP return np, grid.NewRemoteErr(errors.New("Bucket name is missing")) } - globalReplicationStats.Delete(bucketName) + globalReplicationStats.Load().Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) globalBucketTargetSys.Delete(bucketName) globalEventNotifier.RemoveNotification(bucketName) @@ -483,12 +483,12 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(mss *grid.MSS) (np grid.NoP // GetAllBucketStatsHandler - fetches bucket replication stats for all buckets from this peer. func (s *peerRESTServer) GetAllBucketStatsHandler(mss *grid.MSS) (*BucketStatsMap, *grid.RemoteErr) { - replicationStats := globalReplicationStats.GetAll() + replicationStats := globalReplicationStats.Load().GetAll() bucketStatsMap := make(map[string]BucketStats, len(replicationStats)) for k, v := range replicationStats { bucketStatsMap[k] = BucketStats{ ReplicationStats: v, - ProxyStats: globalReplicationStats.getProxyStats(k), + ProxyStats: globalReplicationStats.Load().getProxyStats(k), } } return &BucketStatsMap{Stats: bucketStatsMap, Timestamp: time.Now()}, nil @@ -501,11 +501,14 @@ func (s *peerRESTServer) GetBucketStatsHandler(vars *grid.MSS) (*BucketStats, *g if bucketName == "" { return nil, grid.NewRemoteErrString("Bucket name is missing") } - + st := globalReplicationStats.Load() + if st == nil { + return &BucketStats{}, nil + } bs := BucketStats{ - ReplicationStats: globalReplicationStats.Get(bucketName), - QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}}, - ProxyStats: globalReplicationStats.getProxyStats(bucketName), + ReplicationStats: st.Get(bucketName), + QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{st.getNodeQueueStats(bucketName)}}, + ProxyStats: st.getProxyStats(bucketName), } return &bs, nil } @@ -516,9 +519,11 @@ func (s *peerRESTServer) GetSRMetricsHandler(mss *grid.MSS) (*SRMetricsSummary, if objAPI == nil { return nil, grid.NewRemoteErr(errServerNotInitialized) } - - sm := globalReplicationStats.getSRMetricsForNode() - return &sm, nil + if st := globalReplicationStats.Load(); st != nil { + sm := st.getSRMetricsForNode() + return &sm, nil + } + return &SRMetricsSummary{}, nil } // LoadBucketMetadataHandler - reloads in memory bucket metadata @@ -1173,7 +1178,7 @@ func (s *peerRESTServer) GetReplicationMRFHandler(w http.ResponseWriter, r *http vars := mux.Vars(r) bucketName := vars[peerRESTBucket] ctx := newContext(r, w, "GetReplicationMRF") - re, err := globalReplicationPool.getMRF(ctx, bucketName) + re, err := globalReplicationPool.Get().getMRF(ctx, bucketName) if err != nil { s.writeErrorResponse(w, err) return diff --git a/cmd/server-main.go b/cmd/server-main.go index b006f900c..19036e5cf 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -1082,7 +1082,7 @@ func serverMain(ctx *cli.Context) { // initialize replication resync state. bootstrapTrace("initResync", func() { - globalReplicationPool.initResync(GlobalContext, buckets, newObject) + globalReplicationPool.Get().initResync(GlobalContext, buckets, newObject) }) // Initialize site replication manager after bucket metadata diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 265c99821..728dc80b0 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -5858,7 +5858,7 @@ func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer }) continue } - if err := globalReplicationPool.resyncer.start(ctx, objAPI, resyncOpts{ + if err := globalReplicationPool.Get().resyncer.start(ctx, objAPI, resyncOpts{ bucket: bucket, arn: tgtArn, resyncID: rs.ResyncID, @@ -5953,8 +5953,8 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye continue } // update resync state for the bucket - globalReplicationPool.resyncer.Lock() - m, ok := globalReplicationPool.resyncer.statusMap[bucket] + globalReplicationPool.Get().resyncer.Lock() + m, ok := globalReplicationPool.Get().resyncer.statusMap[bucket] if !ok { m = newBucketResyncStatus(bucket) } @@ -5964,8 +5964,8 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye m.TargetsMap[t.Arn] = st m.LastUpdate = UTCNow() } - globalReplicationPool.resyncer.statusMap[bucket] = m - globalReplicationPool.resyncer.Unlock() + globalReplicationPool.Get().resyncer.statusMap[bucket] = m + globalReplicationPool.Get().resyncer.Unlock() } } @@ -5975,7 +5975,7 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye return res, err } select { - case globalReplicationPool.resyncer.resyncCancelCh <- struct{}{}: + case globalReplicationPool.Get().resyncer.resyncCancelCh <- struct{}{}: case <-ctx.Done(): } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index d7a999733..1e36df60d 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -351,7 +351,9 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer // Test Server needs to start before formatting of disks. // Get credential. credentials := globalActiveCred - + if !globalReplicationPool.IsSet() { + globalReplicationPool.Set(nil) + } testServer.Obj = objLayer testServer.rawDiskPaths = disks testServer.Disks = mustGetPoolEndpoints(0, disks...) diff --git a/internal/once/singleton.go b/internal/once/singleton.go new file mode 100644 index 000000000..7d7f30450 --- /dev/null +++ b/internal/once/singleton.go @@ -0,0 +1,46 @@ +package once + +// Singleton contains a pointer to T that must be set once. +// Until the value is set all Get() calls will block. +type Singleton[T any] struct { + v *T + set chan struct{} +} + +// NewSingleton creates a new unset singleton. +func NewSingleton[T any]() *Singleton[T] { + return &Singleton[T]{set: make(chan struct{}), v: nil} +} + +// Get will return the singleton value. +func (s *Singleton[T]) Get() *T { + <-s.set + return s.v +} + +// GetNonBlocking will return the singleton value or nil if not set yet. +func (s *Singleton[T]) GetNonBlocking() *T { + select { + case <-s.set: + return s.v + default: + return nil + } +} + +// IsSet will return whether the singleton has been set. +func (s *Singleton[T]) IsSet() bool { + select { + case <-s.set: + return true + default: + return false + } +} + +// Set the value and unblock all Get requests. +// This may only be called once, a second call will panic. +func (s *Singleton[T]) Set(v *T) { + s.v = v + close(s.set) +}