From 2faba02d6b558b84dfe35c8a56632aa3642ca5f9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 23 Feb 2024 09:21:38 -0800 Subject: [PATCH] fix: allow diskInfo at storageRPC to be cached (#19112) Bonus: convert timedValue into a typed implementation --- cmd/admin-handlers-users.go | 8 ++--- cmd/bucket-quota.go | 20 +++++------ cmd/data-usage.go | 11 ++---- cmd/erasure-server-pool.go | 33 +++--------------- cmd/metrics-v2.go | 10 +++--- cmd/storage-rest-client.go | 56 +++++++++++++++++++++++-------- cmd/utils.go | 59 +++++++++++++++++++-------------- cmd/utils_test.go | 13 +++----- cmd/xl-storage-disk-id-check.go | 17 ++++------ cmd/xl-storage.go | 23 ++++++------- 10 files changed, 124 insertions(+), 126 deletions(-) diff --git a/cmd/admin-handlers-users.go b/cmd/admin-handlers-users.go index 7409307c7..f2dad4e38 100644 --- a/cmd/admin-handlers-users.go +++ b/cmd/admin-handlers-users.go @@ -1204,7 +1204,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ // Rely on older value if usage loading fails from disk. bucketStorageCache.Relax = true - bucketStorageCache.Update = func() (interface{}, error) { + bucketStorageCache.Update = func() (DataUsageInfo, error) { ctx, done := context.WithTimeout(context.Background(), 2*time.Second) defer done() @@ -1212,11 +1212,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ } }) - var dataUsageInfo DataUsageInfo - v, _ := bucketStorageCache.Get() - if v != nil { - dataUsageInfo, _ = v.(DataUsageInfo) - } + dataUsageInfo, _ := bucketStorageCache.Get() // If etcd, dns federation configured list buckets from etcd. var err error diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index fea29d863..5a8cf5977 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -42,7 +42,7 @@ func NewBucketQuotaSys() *BucketQuotaSys { return &BucketQuotaSys{} } -var bucketStorageCache timedValue +var bucketStorageCache = newTimedValue[DataUsageInfo]() // Init initialize bucket quota. func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) { @@ -52,7 +52,7 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) { bucketStorageCache.TTL = 10 * time.Second // Rely on older value if usage loading fails from disk. bucketStorageCache.Relax = true - bucketStorageCache.Update = func() (interface{}, error) { + bucketStorageCache.Update = func() (DataUsageInfo, error) { ctx, done := context.WithTimeout(context.Background(), 2*time.Second) defer done() @@ -63,23 +63,23 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) { // GetBucketUsageInfo return bucket usage info for a given bucket func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) { - v, err := bucketStorageCache.Get() + dui, err := bucketStorageCache.Get() timedout := OperationTimedOut{} if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) { - if v != nil { + if len(dui.BucketsUsage) > 0 { logger.LogOnceIf(GlobalContext, fmt.Errorf("unable to retrieve usage information for bucket: %s, relying on older value cached in-memory: err(%v)", bucket, err), "bucket-usage-cache-"+bucket) } else { logger.LogOnceIf(GlobalContext, errors.New("unable to retrieve usage information for bucket: %s, no reliable usage value available - quota will not be enforced"), "bucket-usage-empty-"+bucket) } } - var bui BucketUsageInfo - dui, ok := v.(DataUsageInfo) - if ok { - bui = dui.BucketsUsage[bucket] + if len(dui.BucketsUsage) > 0 { + bui, ok := dui.BucketsUsage[bucket] + if ok { + return bui, nil + } } - - return bui, nil + return BucketUsageInfo{}, nil } // parseBucketQuota parses BucketQuota from json diff --git a/cmd/data-usage.go b/cmd/data-usage.go index a4f9ef6b9..95e3fb14f 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -62,7 +62,7 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan } } -var prefixUsageCache timedValue +var prefixUsageCache = newTimedValue[map[string]uint64]() // loadPrefixUsageFromBackend returns prefix usages found in passed buckets // @@ -81,7 +81,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket // No need to fail upon Update() error, fallback to old value. prefixUsageCache.Relax = true - prefixUsageCache.Update = func() (interface{}, error) { + prefixUsageCache.Update = func() (map[string]uint64, error) { m := make(map[string]uint64) for _, pool := range z.serverPools { for _, er := range pool.sets { @@ -109,12 +109,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket } }) - v, _ := prefixUsageCache.Get() - if v != nil { - return v.(map[string]uint64), nil - } - - return map[string]uint64{}, nil + return prefixUsageCache.Get() } func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index bcd01b442..05569d134 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1001,20 +1001,10 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec } object = encodeDirObject(object) - if z.SinglePool() { - if !isMinioMetaBucketName(bucket) { - avail, err := hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), data.Size()) - if err != nil { - logger.LogOnceIf(ctx, err, "erasure-write-quorum") - return ObjectInfo{}, toObjectErr(errErasureWriteQuorum) - } - if !avail { - return ObjectInfo{}, toObjectErr(errDiskFull) - } - } return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) } + if !opts.NoLock { ns := z.NewNSLock(bucket, object) lkctx, err := ns.GetLock(ctx, globalOperationTimeout) @@ -1586,16 +1576,6 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } if z.SinglePool() { - if !isMinioMetaBucketName(bucket) { - avail, err := hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), -1) - if err != nil { - logger.LogIf(ctx, err) - return nil, toObjectErr(errErasureWriteQuorum) - } - if !avail { - return nil, toObjectErr(errDiskFull) - } - } return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) } @@ -1860,7 +1840,7 @@ func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix strin } } -var listBucketsCache timedValue +var listBucketsCache = newTimedValue[[]BucketInfo]() // List all buckets from one of the serverPools, we are not doing merge // sort here just for simplification. As per design it is assumed @@ -1871,7 +1851,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions listBucketsCache.TTL = time.Second listBucketsCache.Relax = true - listBucketsCache.Update = func() (interface{}, error) { + listBucketsCache.Update = func() ([]BucketInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) buckets, err = z.s3Peer.ListBuckets(ctx, opts) cancel() @@ -1888,12 +1868,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions } }) - v, _ := listBucketsCache.Get() - if v != nil { - return v.([]BucketInfo), nil - } - - return buckets, nil + return listBucketsCache.Get() } buckets, err = z.s3Peer.ListBuckets(ctx, opts) diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 7f1192db7..fe68835ff 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -330,7 +330,7 @@ type Metric struct { // MetricsGroup are a group of metrics that are initialized together. type MetricsGroup struct { - metricsCache timedValue `msg:"-"` + metricsCache *timedValue[[]Metric] `msg:"-"` cacheInterval time.Duration metricsGroupOpts MetricsGroupOpts } @@ -354,10 +354,11 @@ type MetricsGroupOpts struct { // RegisterRead register the metrics populator function to be used // to populate new values upon cache invalidation. func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) { + g.metricsCache = newTimedValue[[]Metric]() g.metricsCache.Once.Do(func() { g.metricsCache.Relax = true g.metricsCache.TTL = g.cacheInterval - g.metricsCache.Update = func() (interface{}, error) { + g.metricsCache.Update = func() ([]Metric, error) { if g.metricsGroupOpts.dependGlobalObjectAPI { objLayer := newObjectLayerFn() // Service not initialized yet @@ -445,9 +446,8 @@ func (m *Metric) clone() Metric { // once the TTL expires "read()" registered function is called // to return the new values and updated. func (g *MetricsGroup) Get() (metrics []Metric) { - c, _ := g.metricsCache.Get() - m, ok := c.([]Metric) - if !ok { + m, _ := g.metricsCache.Get() + if len(m) == 0 { return []Metric{} } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 9e0f37096..75c352b88 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -166,6 +166,8 @@ type storageRESTClient struct { formatData []byte formatMutex sync.RWMutex + diskInfoCache *timedValue[DiskInfo] + // Indexes, will be -1 until assigned a set. poolIndex, setIndex, diskIndex int } @@ -306,21 +308,48 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti // transport is already down. return info, errDiskNotFound } - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - opts.DiskID = client.diskID + // if metrics was asked, or it was a NoOp we do not need to cache the value. + if opts.Metrics || opts.NoOp { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - infop, err := storageDiskInfoRPC.Call(ctx, client.gridConn, &opts) - if err != nil { - return info, toStorageErr(err) - } - info = *infop - if info.Error != "" { - return info, toStorageErr(errors.New(info.Error)) - } + opts.DiskID = client.diskID + + infop, err := storageDiskInfoRPC.Call(ctx, client.gridConn, &opts) + if err != nil { + return info, toStorageErr(err) + } + info = *infop + if info.Error != "" { + return info, toStorageErr(errors.New(info.Error)) + } + info.Scanning = atomic.LoadInt32(&client.scanning) == 1 + return info, nil + } // In all other cases cache the value upto 1sec. + + client.diskInfoCache.Once.Do(func() { + client.diskInfoCache.TTL = time.Second + client.diskInfoCache.Update = func() (info DiskInfo, err error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + nopts := DiskInfoOptions{DiskID: client.diskID} + infop, err := storageDiskInfoRPC.Call(ctx, client.gridConn, &nopts) + if err != nil { + return info, toStorageErr(err) + } + info = *infop + if info.Error != "" { + return info, toStorageErr(errors.New(info.Error)) + } + return info, nil + } + }) + + info, err = client.diskInfoCache.Get() info.Scanning = atomic.LoadInt32(&client.scanning) == 1 - return info, nil + return info, err } // MakeVolBulk - create multiple volumes in a bulk operation. @@ -863,6 +892,7 @@ func newStorageRESTClient(endpoint Endpoint, healthCheck bool, gm *grid.Manager) } return &storageRESTClient{ endpoint: endpoint, restClient: restClient, poolIndex: -1, setIndex: -1, diskIndex: -1, - gridConn: conn, + gridConn: conn, + diskInfoCache: newTimedValue[DiskInfo](), }, nil } diff --git a/cmd/utils.go b/cmd/utils.go index a4a5524c1..34fa64deb 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -929,13 +929,13 @@ func iamPolicyClaimNameSA() string { // timedValue contains a synchronized value that is considered valid // for a specific amount of time. // An Update function must be set to provide an updated value when needed. -type timedValue struct { +type timedValue[I any] struct { // Update must return an updated value. // If an error is returned the cached value is not set. // Only one caller will call this function at any time, others will be blocking. // The returned value can no longer be modified once returned. // Should be set before calling Get(). - Update func() (interface{}, error) + Update func() (item I, err error) // TTL for a cached value. // If not set 1 second TTL is assumed. @@ -951,20 +951,26 @@ type timedValue struct { Once sync.Once // Managed values. - value interface{} + value I + valueSet bool lastUpdate time.Time mu sync.RWMutex } +// newTimedValue +func newTimedValue[I any]() *timedValue[I] { + return &timedValue[I]{} +} + // Get will return a cached value or fetch a new one. // If the Update function returns an error the value is forwarded as is and not cached. -func (t *timedValue) Get() (interface{}, error) { - v := t.get(t.ttl()) - if v != nil { - return v, nil +func (t *timedValue[I]) Get() (item I, err error) { + item, ok := t.get(t.ttl()) + if ok { + return item, nil } - v, err := t.Update() + item, err = t.Update() if err != nil { if t.Relax { // if update fails, return current @@ -973,17 +979,19 @@ func (t *timedValue) Get() (interface{}, error) { // Let the caller decide if they want // to use the returned value based // on error. - v = t.get(0) - return v, err + item, ok = t.get(0) + if ok { + return item, err + } } - return v, err + return item, err } - t.update(v) - return v, nil + t.update(item) + return item, nil } -func (t *timedValue) ttl() time.Duration { +func (t *timedValue[_]) ttl() time.Duration { ttl := t.TTL if ttl <= 0 { ttl = time.Second @@ -991,23 +999,26 @@ func (t *timedValue) ttl() time.Duration { return ttl } -func (t *timedValue) get(ttl time.Duration) (v interface{}) { +func (t *timedValue[I]) get(ttl time.Duration) (item I, ok bool) { t.mu.RLock() defer t.mu.RUnlock() - v = t.value - if ttl <= 0 { - return v + if t.valueSet { + item = t.value + if ttl <= 0 { + return item, true + } + if time.Since(t.lastUpdate) < ttl { + return item, true + } } - if time.Since(t.lastUpdate) < ttl { - return v - } - return nil + return item, false } -func (t *timedValue) update(v interface{}) { +func (t *timedValue[I]) update(item I) { t.mu.Lock() defer t.mu.Unlock() - t.value = v + t.value = item + t.valueSet = true t.lastUpdate = time.Now() } diff --git a/cmd/utils_test.go b/cmd/utils_test.go index e788ca478..406319fbc 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -402,28 +402,25 @@ func TestGetMinioMode(t *testing.T) { } func TestTimedValue(t *testing.T) { - var cache timedValue + cache := newTimedValue[time.Time]() t.Parallel() cache.Once.Do(func() { cache.TTL = 2 * time.Second - cache.Update = func() (interface{}, error) { + cache.Update = func() (time.Time, error) { return time.Now(), nil } }) - i, _ := cache.Get() - t1 := i.(time.Time) + t1, _ := cache.Get() - j, _ := cache.Get() - t2 := j.(time.Time) + t2, _ := cache.Get() if !t1.Equal(t2) { t.Fatalf("expected time to be equal: %s != %s", t1, t2) } time.Sleep(3 * time.Second) - k, _ := cache.Get() - t3 := k.(time.Time) + t3, _ := cache.Get() if t1.Equal(t3) { t.Fatalf("expected time to be un-equal: %s == %s", t1, t3) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 9ea8a0ebd..6b18cb0de 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -89,7 +89,7 @@ type xlStorageDiskIDCheck struct { health *diskHealthTracker healthCheck bool - metricsCache timedValue + metricsCache *timedValue[DiskMetrics] diskCtx context.Context diskCancel context.CancelFunc } @@ -97,7 +97,7 @@ type xlStorageDiskIDCheck struct { func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { p.metricsCache.Once.Do(func() { p.metricsCache.TTL = 5 * time.Second - p.metricsCache.Update = func() (interface{}, error) { + p.metricsCache.Update = func() (DiskMetrics, error) { diskMetric := DiskMetrics{ LastMinute: make(map[string]AccElem, len(p.apiLatencies)), APICalls: make(map[string]uint64, len(p.apiCalls)), @@ -111,12 +111,8 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { return diskMetric, nil } }) - m, _ := p.metricsCache.Get() - diskMetric := DiskMetrics{} - if m != nil { - diskMetric = m.(DiskMetrics) - } + diskMetric, _ := p.metricsCache.Get() // Do not need this value to be cached. diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load() diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load() @@ -180,9 +176,10 @@ func (e *lockedLastMinuteLatency) total() AccElem { func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDiskIDCheck { xl := xlStorageDiskIDCheck{ - storage: storage, - health: newDiskHealthTracker(), - healthCheck: healthCheck && globalDriveMonitoring, + storage: storage, + health: newDiskHealthTracker(), + healthCheck: healthCheck && globalDriveMonitoring, + metricsCache: newTimedValue[DiskMetrics](), } xl.totalWrites.Store(xl.storage.getWriteAttribute()) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1184a2f75..7a1aa3815 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -112,7 +112,7 @@ type xlStorage struct { formatLegacy bool formatLastCheck time.Time - diskInfoCache timedValue + diskInfoCache *timedValue[DiskInfo] sync.RWMutex formatData []byte @@ -230,12 +230,13 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error { // Initialize a new storage disk. func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { s = &xlStorage{ - drivePath: ep.Path, - endpoint: ep, - globalSync: globalFSOSync, - poolIndex: -1, - setIndex: -1, - diskIndex: -1, + drivePath: ep.Path, + endpoint: ep, + globalSync: globalFSOSync, + diskInfoCache: newTimedValue[DiskInfo](), + poolIndex: -1, + setIndex: -1, + diskIndex: -1, } s.drivePath, err = getValidPath(ep.Path) @@ -732,7 +733,7 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error { func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) { s.diskInfoCache.Once.Do(func() { s.diskInfoCache.TTL = time.Second - s.diskInfoCache.Update = func() (interface{}, error) { + s.diskInfoCache.Update = func() (DiskInfo, error) { dcinfo := DiskInfo{} di, err := getDiskInfo(s.drivePath) if err != nil { @@ -758,11 +759,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInf } }) - v, err := s.diskInfoCache.Get() - if v != nil { - info = v.(DiskInfo) - } - + info, err = s.diskInfoCache.Get() info.MountPath = s.drivePath info.Endpoint = s.endpoint.String() info.Scanning = atomic.LoadInt32(&s.scanning) == 1