From 47c09a1e6fbf8f8de1b37d76864118585d56ce4a Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Sat, 3 Apr 2021 09:03:42 -0700 Subject: [PATCH] Various improvements in replication (#11949) - collect real time replication metrics for prometheus. - add pending_count, failed_count metric for total pending/failed replication operations. - add API to get replication metrics - add MRF worker to handle spill-over replication operations - multiple issues found with replication - fixes an issue when client sends a bucket name with `/` at the end from SetRemoteTarget API call make sure to trim the bucket name to avoid any extra `/`. - hold write locks in GetObjectNInfo during replication to ensure that object version stack is not overwritten while reading the content. - add additional protection during WriteMetadata() to ensure that we always write a valid FileInfo{} and avoid ever writing empty FileInfo{} to the lowest layers. Co-authored-by: Poorna Krishnamoorthy Co-authored-by: Harshavardhana --- cmd/admin-bucket-handlers.go | 12 +- cmd/admin-handlers-users.go | 2 +- cmd/api-router.go | 9 +- cmd/bucket-handlers.go | 40 +- cmd/bucket-metadata-sys.go | 5 +- cmd/bucket-metadata.go | 4 +- cmd/bucket-quota.go | 4 +- cmd/bucket-replication-stats.go | 173 +++++ cmd/bucket-replication.go | 108 ++- cmd/bucket-targets.go | 8 +- cmd/config/api/api.go | 19 +- cmd/data-scanner.go | 12 +- cmd/data-update-tracker.go | 4 +- cmd/data-usage-cache.go | 187 ++++-- cmd/data-usage-cache_gen.go | 1075 ++++++++++++++++++++++++------ cmd/data-usage-cache_gen_test.go | 339 ++++++++++ cmd/data-usage.go | 19 +- cmd/erasure-metadata-utils.go | 3 +- cmd/erasure-metadata.go | 8 +- cmd/erasure-object.go | 6 +- cmd/erasure-server-pool.go | 4 +- cmd/fs-v1.go | 2 +- cmd/gateway-unsupported.go | 2 +- cmd/metrics-v2.go | 79 ++- cmd/metrics.go | 62 +- cmd/object-api-datatypes.go | 51 -- cmd/object-api-interface.go | 2 +- cmd/object-handlers.go | 16 +- cmd/web-handlers.go | 2 +- cmd/xl-storage-format-v2.go | 31 +- cmd/xl-storage.go | 9 +- docs/metrics/prometheus/list.md | 1 + pkg/madmin/README.md | 10 +- pkg/madmin/info-commands.go | 88 ++- pkg/madmin/update-commands.go | 8 +- pkg/madmin/user-commands.go | 6 +- 36 files changed, 1914 insertions(+), 496 deletions(-) create mode 100644 cmd/bucket-replication-stats.go diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index 1990243be..4793e1ae5 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -21,6 +21,7 @@ import ( "io" "io/ioutil" "net/http" + "path" "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" @@ -50,7 +51,7 @@ func (a adminAPIHandlers) PutBucketQuotaConfigHandler(w http.ResponseWriter, r * } vars := mux.Vars(r) - bucket := vars["bucket"] + bucket := path.Clean(vars["bucket"]) if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) @@ -90,7 +91,8 @@ func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r * } vars := mux.Vars(r) - bucket := vars["bucket"] + bucket := path.Clean(vars["bucket"]) + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return @@ -118,7 +120,7 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http. defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) vars := mux.Vars(r) - bucket := vars["bucket"] + bucket := path.Clean(vars["bucket"]) update := r.URL.Query().Get("update") == "true" if !globalIsErasure { @@ -211,7 +213,7 @@ func (a adminAPIHandlers) ListRemoteTargetsHandler(w http.ResponseWriter, r *htt defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) vars := mux.Vars(r) - bucket := vars["bucket"] + bucket := path.Clean(vars["bucket"]) arnType := vars["type"] if !globalIsErasure { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) @@ -250,7 +252,7 @@ func (a adminAPIHandlers) RemoveRemoteTargetHandler(w http.ResponseWriter, r *ht defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) vars := mux.Vars(r) - bucket := vars["bucket"] + bucket := path.Clean(vars["bucket"]) arn := vars["arn"] if !globalIsErasure { diff --git a/cmd/admin-handlers-users.go b/cmd/admin-handlers-users.go index 0bb353a2b..ad2314cf7 100644 --- a/cmd/admin-handlers-users.go +++ b/cmd/admin-handlers-users.go @@ -775,7 +775,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ if !dataUsageInfo.LastUpdate.IsZero() { size = dataUsageInfo.BucketsUsage[bucket.Name].Size } - acctInfo.Buckets = append(acctInfo.Buckets, madmin.BucketUsageInfo{ + acctInfo.Buckets = append(acctInfo.Buckets, madmin.BucketAccessInfo{ Name: bucket.Name, Created: bucket.Created, Size: size, diff --git a/cmd/api-router.go b/cmd/api-router.go index 870b72b12..9bd6d9bf2 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -311,7 +311,6 @@ func registerAPIRouter(router *mux.Router) { // GetBucketReplicationConfig router.Methods(http.MethodGet).HandlerFunc( collectAPIStats("getbucketreplicationconfiguration", maxClients(httpTraceAll(api.GetBucketReplicationConfigHandler)))).Queries("replication", "") - // GetBucketVersioning router.Methods(http.MethodGet).HandlerFunc( collectAPIStats("getbucketversioning", maxClients(httpTraceAll(api.GetBucketVersioningHandler)))).Queries("versioning", "") @@ -378,8 +377,6 @@ func registerAPIRouter(router *mux.Router) { // PutBucketReplicationConfig router.Methods(http.MethodPut).HandlerFunc( collectAPIStats("putbucketreplicationconfiguration", maxClients(httpTraceAll(api.PutBucketReplicationConfigHandler)))).Queries("replication", "") - // GetObjectRetention - // PutBucketEncryption router.Methods(http.MethodPut).HandlerFunc( collectAPIStats("putbucketencryption", maxClients(httpTraceAll(api.PutBucketEncryptionHandler)))).Queries("encryption", "") @@ -430,6 +427,12 @@ func registerAPIRouter(router *mux.Router) { // ListObjectsV1 (Legacy) router.Methods(http.MethodGet).HandlerFunc( collectAPIStats("listobjectsv1", maxClients(httpTraceAll(api.ListObjectsV1Handler)))) + + // MinIO extension API for replication. + // + // GetBucketReplicationMetrics + router.Methods(http.MethodGet).HandlerFunc( + collectAPIStats("getbucketreplicationmetrics", maxClients(httpTraceAll(api.GetBucketReplicationMetricsHandler)))).Queries("replication-metrics", "") } /// Root operation diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 91d673e0e..3e2e12eac 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/subtle" "encoding/base64" + "encoding/json" "encoding/xml" "fmt" "io" @@ -1243,7 +1244,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. return } } - + globalReplicationStats.Delete(ctx, bucket) // Write success response. writeSuccessNoContent(w) @@ -1603,3 +1604,40 @@ func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.Respons // Write success response. writeSuccessResponseHeadersOnly(w) } + +// GetBucketReplicationMetricsHandler - GET Bucket replication metrics. +// ---------- +// Gets the replication metrics for a bucket. +func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "GetBucketReplicationMetrics") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + // check if user has permissions to perform this operation + if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) + return + } + + // Check if bucket exists. + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + metrics := globalReplicationStats.Get(bucket) + if err := json.NewEncoder(w).Encode(&metrics); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + w.(http.Flusher).Flush() +} diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 05b1cea50..42d001099 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -169,7 +169,10 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat } meta.ReplicationConfigXML = configData case bucketTargetsFile: - meta.BucketTargetsConfigJSON, meta.BucketTargetsConfigMetaJSON, err = encryptBucketMetadata(meta.Name, configData, crypto.Context{bucket: meta.Name, bucketTargetsFile: bucketTargetsFile}) + meta.BucketTargetsConfigJSON, meta.BucketTargetsConfigMetaJSON, err = encryptBucketMetadata(meta.Name, configData, crypto.Context{ + bucket: meta.Name, + bucketTargetsFile: bucketTargetsFile, + }) if err != nil { return fmt.Errorf("Error encrypting bucket target metadata %w", err) } diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 2a8a9adaa..450677416 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -416,6 +416,7 @@ func encryptBucketMetadata(bucket string, input []byte, kmsContext crypto.Contex if err != nil { return } + outbuf := bytes.NewBuffer(nil) objectKey := crypto.GenerateKey(key, rand.Reader) sealedKey = objectKey.Seal(key, crypto.GenerateIV(rand.Reader), crypto.S3.String(), bucket, "") @@ -437,7 +438,6 @@ func decryptBucketMetadata(input []byte, bucket string, meta map[string]string, return nil, errKMSNotConfigured } keyID, kmsKey, sealedKey, err := crypto.S3.ParseMetadata(meta) - if err != nil { return nil, err } @@ -449,8 +449,8 @@ func decryptBucketMetadata(input []byte, bucket string, meta map[string]string, if err = objectKey.Unseal(extKey, sealedKey, crypto.S3.String(), bucket, ""); err != nil { return nil, err } + outbuf := bytes.NewBuffer(nil) _, err = sio.Decrypt(outbuf, bytes.NewBuffer(input), sio.Config{Key: objectKey[:], MinVersion: sio.Version20}) - return outbuf.Bytes(), err } diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index b9f7273bf..b281d7553 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -88,7 +88,7 @@ func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64) return err } - dui := v.(DataUsageInfo) + dui := v.(madmin.DataUsageInfo) bui, ok := dui.BucketsUsage[bucket] if !ok { @@ -115,7 +115,7 @@ func enforceBucketQuota(ctx context.Context, bucket string, size int64) error { // enforceFIFOQuota deletes objects in FIFO order until sufficient objects // have been deleted so as to bring bucket usage within quota. -func enforceFIFOQuotaBucket(ctx context.Context, objectAPI ObjectLayer, bucket string, bui BucketUsageInfo) { +func enforceFIFOQuotaBucket(ctx context.Context, objectAPI ObjectLayer, bucket string, bui madmin.BucketUsageInfo) { // Check if the current bucket has quota restrictions, if not skip it cfg, err := globalBucketQuotaSys.Get(bucket) if err != nil { diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go new file mode 100644 index 000000000..844b1e6e9 --- /dev/null +++ b/cmd/bucket-replication-stats.go @@ -0,0 +1,173 @@ +/* + * MinIO Cloud Storage, (C) 2021 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/minio/minio/pkg/bucket/replication" +) + +// BucketReplicationStats represents inline replication statistics +// such as pending, failed and completed bytes in total for a bucket +type BucketReplicationStats struct { + // Pending size in bytes + PendingSize uint64 `json:"pendingReplicationSize"` + // Completed size in bytes + ReplicatedSize uint64 `json:"completedReplicationSize"` + // Total Replica size in bytes + ReplicaSize uint64 `json:"replicaSize"` + // Failed size in bytes + FailedSize uint64 `json:"failedReplicationSize"` + // Total number of pending operations including metadata updates + PendingCount uint64 `json:"pendingReplicationCount"` + // Total number of failed operations including metadata updates + FailedCount uint64 `json:"failedReplicationCount"` +} + +func (b *BucketReplicationStats) hasReplicationUsage() bool { + return b.PendingSize > 0 || + b.FailedSize > 0 || + b.ReplicatedSize > 0 || + b.ReplicaSize > 0 || + b.PendingCount > 0 || + b.FailedCount > 0 +} + +// ReplicationStats holds the global in-memory replication stats +type ReplicationStats struct { + sync.RWMutex + Cache map[string]*BucketReplicationStats +} + +// Delete deletes in-memory replication statistics for a bucket. +func (r *ReplicationStats) Delete(ctx context.Context, bucket string) { + if r == nil { + return + } + + r.Lock() + defer r.Unlock() + delete(r.Cache, bucket) +} + +// Update updates in-memory replication statistics with new values. +func (r *ReplicationStats) Update(ctx context.Context, bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) { + if r == nil { + return + } + + r.RLock() + b, ok := r.Cache[bucket] + if !ok { + b = &BucketReplicationStats{} + } + r.RUnlock() + + switch status { + case replication.Pending: + if opType == replication.ObjectReplicationType { + atomic.AddUint64(&b.PendingSize, uint64(n)) + } + atomic.AddUint64(&b.PendingCount, 1) + case replication.Completed: + switch prevStatus { // adjust counters based on previous state + case replication.Pending: + atomic.AddUint64(&b.PendingCount, ^uint64(0)) + case replication.Failed: + atomic.AddUint64(&b.FailedCount, ^uint64(0)) + } + if opType == replication.ObjectReplicationType { + atomic.AddUint64(&b.ReplicatedSize, uint64(n)) + switch prevStatus { + case replication.Pending: + atomic.AddUint64(&b.PendingSize, ^uint64(n-1)) + case replication.Failed: + atomic.AddUint64(&b.FailedSize, ^uint64(n-1)) + } + } + case replication.Failed: + // count failures only once - not on every retry + if opType == replication.ObjectReplicationType { + if prevStatus == replication.Pending { + atomic.AddUint64(&b.FailedSize, uint64(n)) + atomic.AddUint64(&b.FailedCount, 1) + } + } + case replication.Replica: + if opType == replication.ObjectReplicationType { + atomic.AddUint64(&b.ReplicaSize, uint64(n)) + } + } +} + +// Get total bytes pending replication for a bucket +func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { + if r == nil { + return BucketReplicationStats{} + } + + r.RLock() + defer r.RUnlock() + st, ok := r.Cache[bucket] + if !ok { + return BucketReplicationStats{} + } + return BucketReplicationStats{ + PendingSize: atomic.LoadUint64(&st.PendingSize), + FailedSize: atomic.LoadUint64(&st.FailedSize), + ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize), + ReplicaSize: atomic.LoadUint64(&st.ReplicaSize), + PendingCount: atomic.LoadUint64(&st.PendingCount), + FailedCount: atomic.LoadUint64(&st.FailedCount), + } +} + +// NewReplicationStats initialize in-memory replication statistics +func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats { + st := &ReplicationStats{ + Cache: make(map[string]*BucketReplicationStats), + } + + dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) + if err != nil { + return st + } + + // data usage has not captured any data yet. + if dataUsageInfo.LastUpdate.IsZero() { + return st + } + + for bucket, usage := range dataUsageInfo.BucketsUsage { + b := &BucketReplicationStats{ + PendingSize: usage.ReplicationPendingSize, + FailedSize: usage.ReplicationFailedSize, + ReplicatedSize: usage.ReplicatedSize, + ReplicaSize: usage.ReplicaSize, + PendingCount: usage.ReplicationPendingCount, + FailedCount: usage.ReplicationFailedCount, + } + if b.hasReplicationUsage() { + st.Cache[bucket] = b + } + } + + return st +} diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7633f159b..92dcd74dd 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -291,6 +291,13 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA versionPurgeStatus = Complete } } + prevStatus := dobj.DeleteMarkerReplicationStatus + currStatus := replicationStatus + if dobj.VersionID != "" { + prevStatus = string(dobj.VersionPurgeStatus) + currStatus = string(versionPurgeStatus) + } + globalReplicationStats.Update(ctx, dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType) // to decrement pending count var eventName = event.ObjectReplicationComplete if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed { @@ -594,7 +601,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa }) return } - gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ + gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, writeLock, ObjectOptions{ VersionID: objInfo.VersionID, }) if err != nil { @@ -604,10 +611,10 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa Object: objInfo, Host: "Internal: [Replication]", }) - logger.LogIf(ctx, err) + logger.LogIf(ctx, fmt.Errorf("Unable to update replicate for %s/%s(%s): %w", bucket, object, objInfo.VersionID, err)) return } - defer gr.Close() // hold read lock for entire transaction + defer gr.Close() // hold write lock for entire transaction objInfo = gr.ObjInfo size, err := objInfo.GetActualSize() @@ -644,7 +651,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa rtype = getReplicationAction(objInfo, oi) if rtype == replicateNone { // object with same VersionID already exists, replication kicked off by - // PutObject might have completed. + // PutObject might have completed return } } @@ -656,7 +663,8 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa srcOpts := miniogo.CopySrcOptions{ Bucket: dest.Bucket, Object: object, - VersionID: objInfo.VersionID} + VersionID: objInfo.VersionID, + } dstOpts := miniogo.PutObjectOptions{ Internal: miniogo.AdvancedPutOptions{ SourceVersionID: objInfo.VersionID, @@ -712,6 +720,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa defer r.Close() } + prevReplStatus := objInfo.ReplicationStatus objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() if objInfo.UserTags != "" { objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags @@ -736,6 +745,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } } + opType := replication.MetadataReplicationType + if rtype == replicateAll { + opType = replication.ObjectReplicationType + } + globalReplicationStats.Update(ctx, bucket, size, replicationStatus, prevReplStatus, opType) sendEvent(eventArgs{ EventName: eventName, BucketName: bucket, @@ -774,38 +788,61 @@ type DeletedObjectVersionInfo struct { } var ( - globalReplicationPool *ReplicationPool + globalReplicationPool *ReplicationPool + globalReplicationStats *ReplicationStats ) // ReplicationPool describes replication pool type ReplicationPool struct { - mu sync.Mutex - size int - replicaCh chan ObjectInfo - replicaDeleteCh chan DeletedObjectVersionInfo - killCh chan struct{} - wg sync.WaitGroup - ctx context.Context - objLayer ObjectLayer + mu sync.Mutex + size int + replicaCh chan ObjectInfo + replicaDeleteCh chan DeletedObjectVersionInfo + mrfReplicaCh chan ObjectInfo + mrfReplicaDeleteCh chan DeletedObjectVersionInfo + killCh chan struct{} + wg sync.WaitGroup + ctx context.Context + objLayer ObjectLayer } // NewReplicationPool creates a pool of replication workers of specified size func NewReplicationPool(ctx context.Context, o ObjectLayer, sz int) *ReplicationPool { pool := &ReplicationPool{ - replicaCh: make(chan ObjectInfo, 10000), - replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), - ctx: ctx, - objLayer: o, + replicaCh: make(chan ObjectInfo, 1000), + replicaDeleteCh: make(chan DeletedObjectVersionInfo, 1000), + mrfReplicaCh: make(chan ObjectInfo, 100000), + mrfReplicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), + ctx: ctx, + objLayer: o, } - go func() { - <-ctx.Done() - close(pool.replicaCh) - close(pool.replicaDeleteCh) - }() pool.Resize(sz) + // add long running worker for handling most recent failures/pending replications + go pool.AddMRFWorker() return pool } +// AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued +// to the other workers +func (p *ReplicationPool) AddMRFWorker() { + for { + select { + case <-p.ctx.Done(): + return + case oi, ok := <-p.mrfReplicaCh: + if !ok { + return + } + replicateObject(p.ctx, oi, p.objLayer) + case doi, ok := <-p.mrfReplicaDeleteCh: + if !ok { + return + } + replicateDelete(p.ctx, doi, p.objLayer) + } + } +} + // AddWorker adds a replication worker to the pool func (p *ReplicationPool) AddWorker() { defer p.wg.Done() @@ -846,28 +883,39 @@ func (p *ReplicationPool) Resize(n int) { } } -func (p *ReplicationPool) queueReplicaTask(oi ObjectInfo) { +func (p *ReplicationPool) queueReplicaTask(ctx context.Context, oi ObjectInfo) { if p == nil { return } select { + case <-ctx.Done(): + close(p.replicaCh) + close(p.mrfReplicaCh) case p.replicaCh <- oi: + case p.mrfReplicaCh <- oi: + // queue all overflows into the mrfReplicaCh to handle incoming pending/failed operations default: } } -func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) { +func (p *ReplicationPool) queueReplicaDeleteTask(ctx context.Context, doi DeletedObjectVersionInfo) { if p == nil { return } select { + case <-ctx.Done(): + close(p.replicaDeleteCh) + close(p.mrfReplicaDeleteCh) case p.replicaDeleteCh <- doi: + case p.mrfReplicaDeleteCh <- doi: + // queue all overflows into the mrfReplicaDeleteCh to handle incoming pending/failed operations default: } } func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { globalReplicationPool = NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationWorkers()) + globalReplicationStats = NewReplicationStats(ctx, objectAPI) } // get Reader from replication target if active-active replication is in place and @@ -1003,11 +1051,14 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op return oi, proxy, err } -func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool) { +func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) { if sync { replicateObject(ctx, objInfo, o) } else { - globalReplicationPool.queueReplicaTask(objInfo) + globalReplicationPool.queueReplicaTask(GlobalContext, objInfo) + } + if sz, err := objInfo.GetActualSize(); err == nil { + globalReplicationStats.Update(ctx, objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) } } @@ -1015,6 +1066,7 @@ func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, if sync { replicateDelete(ctx, dv, o) } else { - globalReplicationPool.queueReplicaDeleteTask(dv) + globalReplicationPool.queueReplicaDeleteTask(GlobalContext, dv) } + globalReplicationStats.Update(ctx, dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index fdc4fbc85..4847f2d4d 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -31,6 +31,7 @@ import ( miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio/cmd/crypto" + "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bucket/versioning" "github.com/minio/minio/pkg/madmin" ) @@ -328,6 +329,7 @@ func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objA for _, bucket := range buckets { cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket.Name) if err != nil { + logger.LogIf(ctx, err) continue } if cfg == nil || cfg.Empty() { @@ -339,6 +341,7 @@ func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objA for _, tgt := range cfg.Targets { tgtClient, err := sys.getRemoteTargetClient(&tgt) if err != nil { + logger.LogIf(ctx, err) continue } sys.arnRemotesMap[tgt.Arn] = tgtClient @@ -432,7 +435,10 @@ func parseBucketTargetConfig(bucket string, cdata, cmetadata []byte) (*madmin.Bu return nil, err } if crypto.S3.IsEncrypted(meta) { - if data, err = decryptBucketMetadata(cdata, bucket, meta, crypto.Context{bucket: bucket, bucketTargetsFile: bucketTargetsFile}); err != nil { + if data, err = decryptBucketMetadata(cdata, bucket, meta, crypto.Context{ + bucket: bucket, + bucketTargetsFile: bucketTargetsFile, + }); err != nil { return nil, err } } diff --git a/cmd/config/api/api.go b/cmd/config/api/api.go index 2c89a326d..3e1bb1193 100644 --- a/cmd/config/api/api.go +++ b/cmd/config/api/api.go @@ -29,14 +29,15 @@ import ( // API sub-system constants const ( - apiRequestsMax = "requests_max" - apiRequestsDeadline = "requests_deadline" - apiClusterDeadline = "cluster_deadline" - apiCorsAllowOrigin = "cors_allow_origin" - apiRemoteTransportDeadline = "remote_transport_deadline" - apiListQuorum = "list_quorum" - apiExtendListCacheLife = "extend_list_cache_life" - apiReplicationWorkers = "replication_workers" + apiRequestsMax = "requests_max" + apiRequestsDeadline = "requests_deadline" + apiClusterDeadline = "cluster_deadline" + apiCorsAllowOrigin = "cors_allow_origin" + apiRemoteTransportDeadline = "remote_transport_deadline" + apiListQuorum = "list_quorum" + apiExtendListCacheLife = "extend_list_cache_life" + apiReplicationWorkers = "replication_workers" + EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" @@ -87,7 +88,7 @@ var ( }, config.KV{ Key: apiReplicationWorkers, - Value: "100", + Value: "500", }, } ) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index d814692a9..e0cd7fdcc 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -134,7 +134,7 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) { } // Wait before starting next cycle and wait on startup. - results := make(chan DataUsageInfo, 1) + results := make(chan madmin.DataUsageInfo, 1) go storeDataUsageInBackend(ctx, objAPI, results) bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle) logger.LogIf(ctx, err) @@ -790,6 +790,8 @@ type sizeSummary struct { pendingSize int64 failedSize int64 replicaSize int64 + pendingCount uint64 + failedCount uint64 } type getSizeFn func(item scannerItem) (sizeSummary, error) @@ -1105,11 +1107,13 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj } switch oi.ReplicationStatus { case replication.Pending: + sizeS.pendingCount++ sizeS.pendingSize += oi.Size - globalReplicationPool.queueReplicaTask(oi) + globalReplicationPool.queueReplicaTask(ctx, oi) case replication.Failed: sizeS.failedSize += oi.Size - globalReplicationPool.queueReplicaTask(oi) + sizeS.failedCount++ + globalReplicationPool.queueReplicaTask(ctx, oi) case replication.Completed, "COMPLETE": sizeS.replicatedSize += oi.Size case replication.Replica: @@ -1128,7 +1132,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, } else { versionID = oi.VersionID } - globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + globalReplicationPool.queueReplicaDeleteTask(ctx, DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ ObjectName: oi.Name, DeleteMarkerVersionID: dmVersionID, diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index 5e2c12767..efd563ed5 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -31,11 +31,9 @@ import ( "sync" "time" - "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/console" - "github.com/minio/minio/pkg/env" "github.com/willf/bloom" ) @@ -80,7 +78,7 @@ func newDataUpdateTracker() *dataUpdateTracker { Current: dataUpdateFilter{ idx: 1, }, - debug: env.Get(envDataUsageScannerDebug, config.EnableOff) == config.EnableOn || serverDebugLog, + debug: serverDebugLog, input: make(chan string, dataUpdateTrackerQueueSize), save: make(chan struct{}, 1), saveExited: make(chan struct{}), diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index b0a854b1a..23276396c 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/hash" + "github.com/minio/minio/pkg/madmin" "github.com/tinylib/msgp/msgp" ) @@ -45,15 +46,26 @@ type sizeHistogram [dataUsageBucketLen]uint64 //msgp:tuple dataUsageEntry type dataUsageEntry struct { + Children dataUsageHashMap // These fields do no include any children. - Size int64 - ReplicatedSize uint64 - ReplicationPendingSize uint64 - ReplicationFailedSize uint64 - ReplicaSize uint64 - Objects uint64 - ObjSizes sizeHistogram - Children dataUsageHashMap + Size int64 + Objects uint64 + ObjSizes sizeHistogram + ReplicationStats replicationStats +} + +//msgp:tuple replicationStats +type replicationStats struct { + PendingSize uint64 + ReplicatedSize uint64 + FailedSize uint64 + ReplicaSize uint64 + FailedCount uint64 + PendingCount uint64 + MissedThresholdSize uint64 + AfterThresholdSize uint64 + MissedThresholdCount uint64 + AfterThresholdCount uint64 } //msgp:tuple dataUsageEntryV2 @@ -65,20 +77,40 @@ type dataUsageEntryV2 struct { Children dataUsageHashMap } -// dataUsageCache contains a cache of data usage entries latest version 3. -type dataUsageCache struct { - Info dataUsageCacheInfo - Disks []string - Cache map[string]dataUsageEntry +//msgp:tuple dataUsageEntryV3 +type dataUsageEntryV3 struct { + // These fields do no include any children. + Size int64 + ReplicatedSize uint64 + ReplicationPendingSize uint64 + ReplicationFailedSize uint64 + ReplicaSize uint64 + Objects uint64 + ObjSizes sizeHistogram + Children dataUsageHashMap } -// dataUsageCache contains a cache of data usage entries version 2. +// dataUsageCache contains a cache of data usage entries latest version 4. +type dataUsageCache struct { + Info dataUsageCacheInfo + Cache map[string]dataUsageEntry + Disks []string +} + +// dataUsageCacheV2 contains a cache of data usage entries version 2. type dataUsageCacheV2 struct { Info dataUsageCacheInfo Disks []string Cache map[string]dataUsageEntryV2 } +// dataUsageCache contains a cache of data usage entries version 3. +type dataUsageCacheV3 struct { + Info dataUsageCacheInfo + Disks []string + Cache map[string]dataUsageEntryV3 +} + //msgp:ignore dataUsageEntryInfo type dataUsageEntryInfo struct { Name string @@ -89,8 +121,8 @@ type dataUsageEntryInfo struct { type dataUsageCacheInfo struct { // Name of the bucket. Also root element. Name string - LastUpdate time.Time NextCycle uint32 + LastUpdate time.Time // indicates if the disk is being healed and scanner // should skip healing the disk SkipHealing bool @@ -100,20 +132,25 @@ type dataUsageCacheInfo struct { func (e *dataUsageEntry) addSizes(summary sizeSummary) { e.Size += summary.totalSize - e.ReplicatedSize += uint64(summary.replicatedSize) - e.ReplicationFailedSize += uint64(summary.failedSize) - e.ReplicationPendingSize += uint64(summary.pendingSize) - e.ReplicaSize += uint64(summary.replicaSize) + e.ReplicationStats.ReplicatedSize += uint64(summary.replicatedSize) + e.ReplicationStats.FailedSize += uint64(summary.failedSize) + e.ReplicationStats.PendingSize += uint64(summary.pendingSize) + e.ReplicationStats.ReplicaSize += uint64(summary.replicaSize) + e.ReplicationStats.PendingCount += uint64(summary.pendingCount) + e.ReplicationStats.FailedCount += uint64(summary.failedCount) + } // merge other data usage entry into this, excluding children. func (e *dataUsageEntry) merge(other dataUsageEntry) { e.Objects += other.Objects e.Size += other.Size - e.ReplicationPendingSize += other.ReplicationPendingSize - e.ReplicationFailedSize += other.ReplicationFailedSize - e.ReplicatedSize += other.ReplicatedSize - e.ReplicaSize += other.ReplicaSize + e.ReplicationStats.PendingSize += other.ReplicationStats.PendingSize + e.ReplicationStats.FailedSize += other.ReplicationStats.FailedSize + e.ReplicationStats.ReplicatedSize += other.ReplicationStats.ReplicatedSize + e.ReplicationStats.ReplicaSize += other.ReplicationStats.ReplicaSize + e.ReplicationStats.PendingCount += other.ReplicationStats.PendingCount + e.ReplicationStats.FailedCount += other.ReplicationStats.FailedCount for i, v := range other.ObjSizes[:] { e.ObjSizes[i] += v @@ -238,25 +275,27 @@ func (d *dataUsageCache) keepRootChildren(list map[dataUsageHash]struct{}) { } } -// dui converts the flattened version of the path to DataUsageInfo. +// dui converts the flattened version of the path to madmin.DataUsageInfo. // As a side effect d will be flattened, use a clone if this is not ok. -func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo { +func (d *dataUsageCache) dui(path string, buckets []BucketInfo) madmin.DataUsageInfo { e := d.find(path) if e == nil { // No entry found, return empty. - return DataUsageInfo{} + return madmin.DataUsageInfo{} } flat := d.flatten(*e) - return DataUsageInfo{ - LastUpdate: d.Info.LastUpdate, - ObjectsTotalCount: flat.Objects, - ObjectsTotalSize: uint64(flat.Size), - ReplicatedSize: flat.ReplicatedSize, - ReplicationFailedSize: flat.ReplicationFailedSize, - ReplicationPendingSize: flat.ReplicationPendingSize, - ReplicaSize: flat.ReplicaSize, - BucketsCount: uint64(len(e.Children)), - BucketsUsage: d.bucketsUsageInfo(buckets), + return madmin.DataUsageInfo{ + LastUpdate: d.Info.LastUpdate, + ObjectsTotalCount: flat.Objects, + ObjectsTotalSize: uint64(flat.Size), + ReplicatedSize: flat.ReplicationStats.ReplicatedSize, + ReplicationFailedSize: flat.ReplicationStats.FailedSize, + ReplicationPendingSize: flat.ReplicationStats.PendingSize, + ReplicaSize: flat.ReplicationStats.ReplicaSize, + ReplicationPendingCount: flat.ReplicationStats.PendingCount, + ReplicationFailedCount: flat.ReplicationStats.FailedCount, + BucketsCount: uint64(len(e.Children)), + BucketsUsage: d.bucketsUsageInfo(buckets), } } @@ -373,22 +412,24 @@ func (h *sizeHistogram) toMap() map[string]uint64 { // bucketsUsageInfo returns the buckets usage info as a map, with // key as bucket name -func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]BucketUsageInfo { - var dst = make(map[string]BucketUsageInfo, len(buckets)) +func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]madmin.BucketUsageInfo { + var dst = make(map[string]madmin.BucketUsageInfo, len(buckets)) for _, bucket := range buckets { e := d.find(bucket.Name) if e == nil { continue } flat := d.flatten(*e) - dst[bucket.Name] = BucketUsageInfo{ - Size: uint64(flat.Size), - ObjectsCount: flat.Objects, - ReplicationPendingSize: flat.ReplicationPendingSize, - ReplicatedSize: flat.ReplicatedSize, - ReplicationFailedSize: flat.ReplicationFailedSize, - ReplicaSize: flat.ReplicaSize, - ObjectSizesHistogram: flat.ObjSizes.toMap(), + dst[bucket.Name] = madmin.BucketUsageInfo{ + Size: uint64(flat.Size), + ObjectsCount: flat.Objects, + ReplicationPendingSize: flat.ReplicationStats.PendingSize, + ReplicatedSize: flat.ReplicationStats.ReplicatedSize, + ReplicationFailedSize: flat.ReplicationStats.FailedSize, + ReplicationPendingCount: flat.ReplicationStats.PendingCount, + ReplicationFailedCount: flat.ReplicationStats.FailedCount, + ReplicaSize: flat.ReplicationStats.ReplicaSize, + ObjectSizesHistogram: flat.ObjSizes.toMap(), } } return dst @@ -396,20 +437,22 @@ func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]Bucke // bucketUsageInfo returns the buckets usage info. // If not found all values returned are zero values. -func (d *dataUsageCache) bucketUsageInfo(bucket string) BucketUsageInfo { +func (d *dataUsageCache) bucketUsageInfo(bucket string) madmin.BucketUsageInfo { e := d.find(bucket) if e == nil { - return BucketUsageInfo{} + return madmin.BucketUsageInfo{} } flat := d.flatten(*e) - return BucketUsageInfo{ - Size: uint64(flat.Size), - ObjectsCount: flat.Objects, - ReplicationPendingSize: flat.ReplicationPendingSize, - ReplicatedSize: flat.ReplicatedSize, - ReplicationFailedSize: flat.ReplicationFailedSize, - ReplicaSize: flat.ReplicaSize, - ObjectSizesHistogram: flat.ObjSizes.toMap(), + return madmin.BucketUsageInfo{ + Size: uint64(flat.Size), + ObjectsCount: flat.Objects, + ReplicationPendingSize: flat.ReplicationStats.PendingSize, + ReplicationPendingCount: flat.ReplicationStats.PendingCount, + ReplicatedSize: flat.ReplicationStats.ReplicatedSize, + ReplicationFailedSize: flat.ReplicationStats.FailedSize, + ReplicationFailedCount: flat.ReplicationStats.FailedCount, + ReplicaSize: flat.ReplicationStats.ReplicaSize, + ObjectSizesHistogram: flat.ObjSizes.toMap(), } } @@ -533,6 +576,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) // Bumping the cache version will drop data from previous versions // and write new data with the new version. const ( + dataUsageCacheVerV4 = 4 dataUsageCacheVerV3 = 3 dataUsageCacheVerV2 = 2 dataUsageCacheVerV1 = 1 @@ -541,7 +585,7 @@ const ( // serialize the contents of the cache. func (d *dataUsageCache) serializeTo(dst io.Writer) error { // Add version and compress. - _, err := dst.Write([]byte{dataUsageCacheVerV3}) + _, err := dst.Write([]byte{dataUsageCacheVerV4}) if err != nil { return err } @@ -609,6 +653,35 @@ func (d *dataUsageCache) deserialize(r io.Reader) error { return err } defer dec.Close() + dold := &dataUsageCacheV3{} + if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil { + return err + } + d.Info = dold.Info + d.Disks = dold.Disks + d.Cache = make(map[string]dataUsageEntry, len(dold.Cache)) + for k, v := range dold.Cache { + d.Cache[k] = dataUsageEntry{ + Size: v.Size, + Objects: v.Objects, + ObjSizes: v.ObjSizes, + Children: v.Children, + ReplicationStats: replicationStats{ + ReplicatedSize: v.ReplicatedSize, + ReplicaSize: v.ReplicaSize, + FailedSize: v.ReplicationFailedSize, + PendingSize: v.ReplicationPendingSize, + }, + } + } + return nil + case dataUsageCacheVerV4: + // Zstd compressed. + dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2)) + if err != nil { + return err + } + defer dec.Close() return d.DecodeMsg(msgp.NewReader(dec)) } diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index a686d2de6..218d26153 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -30,54 +30,54 @@ func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Info") return } - case "Disks": - var zb0002 uint32 - zb0002, err = dc.ReadArrayHeader() - if err != nil { - err = msgp.WrapError(err, "Disks") - return - } - if cap(z.Disks) >= int(zb0002) { - z.Disks = (z.Disks)[:zb0002] - } else { - z.Disks = make([]string, zb0002) - } - for za0001 := range z.Disks { - z.Disks[za0001], err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Disks", za0001) - return - } - } case "Cache": - var zb0003 uint32 - zb0003, err = dc.ReadMapHeader() + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() if err != nil { err = msgp.WrapError(err, "Cache") return } if z.Cache == nil { - z.Cache = make(map[string]dataUsageEntry, zb0003) + z.Cache = make(map[string]dataUsageEntry, zb0002) } else if len(z.Cache) > 0 { for key := range z.Cache { delete(z.Cache, key) } } - for zb0003 > 0 { - zb0003-- - var za0002 string - var za0003 dataUsageEntry - za0002, err = dc.ReadString() + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 dataUsageEntry + za0001, err = dc.ReadString() if err != nil { err = msgp.WrapError(err, "Cache") return } - err = za0003.DecodeMsg(dc) + err = za0002.DecodeMsg(dc) if err != nil { - err = msgp.WrapError(err, "Cache", za0002) + err = msgp.WrapError(err, "Cache", za0001) + return + } + z.Cache[za0001] = za0002 + } + case "Disks": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0003) { + z.Disks = (z.Disks)[:zb0003] + } else { + z.Disks = make([]string, zb0003) + } + for za0003 := range z.Disks { + z.Disks[za0003], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Disks", za0003) return } - z.Cache[za0002] = za0003 } default: err = dc.Skip() @@ -103,23 +103,6 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Info") return } - // write "Disks" - err = en.Append(0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) - if err != nil { - return - } - err = en.WriteArrayHeader(uint32(len(z.Disks))) - if err != nil { - err = msgp.WrapError(err, "Disks") - return - } - for za0001 := range z.Disks { - err = en.WriteString(z.Disks[za0001]) - if err != nil { - err = msgp.WrapError(err, "Disks", za0001) - return - } - } // write "Cache" err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) if err != nil { @@ -130,15 +113,32 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Cache") return } - for za0002, za0003 := range z.Cache { - err = en.WriteString(za0002) + for za0001, za0002 := range z.Cache { + err = en.WriteString(za0001) if err != nil { err = msgp.WrapError(err, "Cache") return } - err = za0003.EncodeMsg(en) + err = za0002.EncodeMsg(en) if err != nil { - err = msgp.WrapError(err, "Cache", za0002) + err = msgp.WrapError(err, "Cache", za0001) + return + } + } + // write "Disks" + err = en.Append(0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Disks))) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + for za0003 := range z.Disks { + err = en.WriteString(z.Disks[za0003]) + if err != nil { + err = msgp.WrapError(err, "Disks", za0003) return } } @@ -156,23 +156,23 @@ func (z *dataUsageCache) MarshalMsg(b []byte) (o []byte, err error) { err = msgp.WrapError(err, "Info") return } - // string "Disks" - o = append(o, 0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.Disks))) - for za0001 := range z.Disks { - o = msgp.AppendString(o, z.Disks[za0001]) - } // string "Cache" o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) o = msgp.AppendMapHeader(o, uint32(len(z.Cache))) - for za0002, za0003 := range z.Cache { - o = msgp.AppendString(o, za0002) - o, err = za0003.MarshalMsg(o) + for za0001, za0002 := range z.Cache { + o = msgp.AppendString(o, za0001) + o, err = za0002.MarshalMsg(o) if err != nil { - err = msgp.WrapError(err, "Cache", za0002) + err = msgp.WrapError(err, "Cache", za0001) return } } + // string "Disks" + o = append(o, 0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Disks))) + for za0003 := range z.Disks { + o = msgp.AppendString(o, z.Disks[za0003]) + } return } @@ -200,54 +200,54 @@ func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Info") return } - case "Disks": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Disks") - return - } - if cap(z.Disks) >= int(zb0002) { - z.Disks = (z.Disks)[:zb0002] - } else { - z.Disks = make([]string, zb0002) - } - for za0001 := range z.Disks { - z.Disks[za0001], bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Disks", za0001) - return - } - } case "Cache": - var zb0003 uint32 - zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { err = msgp.WrapError(err, "Cache") return } if z.Cache == nil { - z.Cache = make(map[string]dataUsageEntry, zb0003) + z.Cache = make(map[string]dataUsageEntry, zb0002) } else if len(z.Cache) > 0 { for key := range z.Cache { delete(z.Cache, key) } } - for zb0003 > 0 { - var za0002 string - var za0003 dataUsageEntry - zb0003-- - za0002, bts, err = msgp.ReadStringBytes(bts) + for zb0002 > 0 { + var za0001 string + var za0002 dataUsageEntry + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "Cache") return } - bts, err = za0003.UnmarshalMsg(bts) + bts, err = za0002.UnmarshalMsg(bts) if err != nil { - err = msgp.WrapError(err, "Cache", za0002) + err = msgp.WrapError(err, "Cache", za0001) + return + } + z.Cache[za0001] = za0002 + } + case "Disks": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0003) { + z.Disks = (z.Disks)[:zb0003] + } else { + z.Disks = make([]string, zb0003) + } + for za0003 := range z.Disks { + z.Disks[za0003], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks", za0003) return } - z.Cache[za0002] = za0003 } default: bts, err = msgp.Skip(bts) @@ -263,17 +263,17 @@ func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *dataUsageCache) Msgsize() (s int) { - s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.ArrayHeaderSize - for za0001 := range z.Disks { - s += msgp.StringPrefixSize + len(z.Disks[za0001]) - } - s += 6 + msgp.MapHeaderSize + s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.MapHeaderSize if z.Cache != nil { - for za0002, za0003 := range z.Cache { - _ = za0003 - s += msgp.StringPrefixSize + len(za0002) + za0003.Msgsize() + for za0001, za0002 := range z.Cache { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() } } + s += 6 + msgp.ArrayHeaderSize + for za0003 := range z.Disks { + s += msgp.StringPrefixSize + len(z.Disks[za0003]) + } return } @@ -301,18 +301,18 @@ func (z *dataUsageCacheInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Name") return } - case "LastUpdate": - z.LastUpdate, err = dc.ReadTime() - if err != nil { - err = msgp.WrapError(err, "LastUpdate") - return - } case "NextCycle": z.NextCycle, err = dc.ReadUint32() if err != nil { err = msgp.WrapError(err, "NextCycle") return } + case "LastUpdate": + z.LastUpdate, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } case "SkipHealing": z.SkipHealing, err = dc.ReadBool() if err != nil { @@ -363,16 +363,6 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Name") return } - // write "LastUpdate" - err = en.Append(0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) - if err != nil { - return - } - err = en.WriteTime(z.LastUpdate) - if err != nil { - err = msgp.WrapError(err, "LastUpdate") - return - } // write "NextCycle" err = en.Append(0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65) if err != nil { @@ -383,6 +373,16 @@ func (z *dataUsageCacheInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "NextCycle") return } + // write "LastUpdate" + err = en.Append(0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.LastUpdate) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } // write "SkipHealing" err = en.Append(0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) if err != nil { @@ -426,12 +426,12 @@ func (z *dataUsageCacheInfo) MarshalMsg(b []byte) (o []byte, err error) { // string "Name" o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) - // string "LastUpdate" - o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) - o = msgp.AppendTime(o, z.LastUpdate) // string "NextCycle" o = append(o, 0xa9, 0x4e, 0x65, 0x78, 0x74, 0x43, 0x79, 0x63, 0x6c, 0x65) o = msgp.AppendUint32(o, z.NextCycle) + // string "LastUpdate" + o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) + o = msgp.AppendTime(o, z.LastUpdate) // string "SkipHealing" o = append(o, 0xab, 0x53, 0x6b, 0x69, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) o = msgp.AppendBool(o, z.SkipHealing) @@ -467,18 +467,18 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Name") return } - case "LastUpdate": - z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts) - if err != nil { - err = msgp.WrapError(err, "LastUpdate") - return - } case "NextCycle": z.NextCycle, bts, err = msgp.ReadUint32Bytes(bts) if err != nil { err = msgp.WrapError(err, "NextCycle") return } + case "LastUpdate": + z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } case "SkipHealing": z.SkipHealing, bts, err = msgp.ReadBoolBytes(bts) if err != nil { @@ -505,7 +505,7 @@ func (z *dataUsageCacheInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *dataUsageCacheInfo) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 11 + msgp.TimeSize + 10 + msgp.Uint32Size + 12 + msgp.BoolSize + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) + s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 10 + msgp.Uint32Size + 11 + msgp.TimeSize + 12 + msgp.BoolSize + 12 + msgp.BytesPrefixSize + len(z.BloomFilter) return } @@ -780,6 +780,277 @@ func (z *dataUsageCacheV2) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *dataUsageCacheV3) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Info": + err = z.Info.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + case "Disks": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0002) { + z.Disks = (z.Disks)[:zb0002] + } else { + z.Disks = make([]string, zb0002) + } + for za0001 := range z.Disks { + z.Disks[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + case "Cache": + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + if z.Cache == nil { + z.Cache = make(map[string]dataUsageEntryV3, zb0003) + } else if len(z.Cache) > 0 { + for key := range z.Cache { + delete(z.Cache, key) + } + } + for zb0003 > 0 { + zb0003-- + var za0002 string + var za0003 dataUsageEntryV3 + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + err = za0003.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + z.Cache[za0002] = za0003 + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *dataUsageCacheV3) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "Info" + err = en.Append(0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + if err != nil { + return + } + err = z.Info.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + // write "Disks" + err = en.Append(0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Disks))) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + for za0001 := range z.Disks { + err = en.WriteString(z.Disks[za0001]) + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + // write "Cache" + err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Cache))) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + for za0002, za0003 := range z.Cache { + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + err = za0003.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *dataUsageCacheV3) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 3 + // string "Info" + o = append(o, 0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + o, err = z.Info.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + // string "Disks" + o = append(o, 0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Disks))) + for za0001 := range z.Disks { + o = msgp.AppendString(o, z.Disks[za0001]) + } + // string "Cache" + o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) + o = msgp.AppendMapHeader(o, uint32(len(z.Cache))) + for za0002, za0003 := range z.Cache { + o = msgp.AppendString(o, za0002) + o, err = za0003.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *dataUsageCacheV3) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Info": + bts, err = z.Info.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + case "Disks": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0002) { + z.Disks = (z.Disks)[:zb0002] + } else { + z.Disks = make([]string, zb0002) + } + for za0001 := range z.Disks { + z.Disks[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + case "Cache": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + if z.Cache == nil { + z.Cache = make(map[string]dataUsageEntryV3, zb0003) + } else if len(z.Cache) > 0 { + for key := range z.Cache { + delete(z.Cache, key) + } + } + for zb0003 > 0 { + var za0002 string + var za0003 dataUsageEntryV3 + zb0003-- + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Cache") + return + } + bts, err = za0003.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Cache", za0002) + return + } + z.Cache[za0002] = za0003 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *dataUsageCacheV3) Msgsize() (s int) { + s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Disks { + s += msgp.StringPrefixSize + len(z.Disks[za0001]) + } + s += 6 + msgp.MapHeaderSize + if z.Cache != nil { + for za0002, za0003 := range z.Cache { + _ = za0003 + s += msgp.StringPrefixSize + len(za0002) + za0003.Msgsize() + } + } + return +} + // DecodeMsg implements msgp.Decodable func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 @@ -788,8 +1059,13 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 8 { - err = msgp.ArrayError{Wanted: 8, Got: zb0001} + if zb0001 != 5 { + err = msgp.ArrayError{Wanted: 5, Got: zb0001} + return + } + err = z.Children.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Children") return } z.Size, err = dc.ReadInt64() @@ -797,26 +1073,6 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Size") return } - z.ReplicatedSize, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "ReplicatedSize") - return - } - z.ReplicationPendingSize, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "ReplicationPendingSize") - return - } - z.ReplicationFailedSize, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "ReplicationFailedSize") - return - } - z.ReplicaSize, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "ReplicaSize") - return - } z.Objects, err = dc.ReadUint64() if err != nil { err = msgp.WrapError(err, "Objects") @@ -839,9 +1095,9 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) { return } } - err = z.Children.DecodeMsg(dc) + err = z.ReplicationStats.DecodeMsg(dc) if err != nil { - err = msgp.WrapError(err, "Children") + err = msgp.WrapError(err, "ReplicationStats") return } return @@ -849,36 +1105,21 @@ func (z *dataUsageEntry) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *dataUsageEntry) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 8 - err = en.Append(0x98) + // array header, size 5 + err = en.Append(0x95) if err != nil { return } + err = z.Children.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } err = en.WriteInt64(z.Size) if err != nil { err = msgp.WrapError(err, "Size") return } - err = en.WriteUint64(z.ReplicatedSize) - if err != nil { - err = msgp.WrapError(err, "ReplicatedSize") - return - } - err = en.WriteUint64(z.ReplicationPendingSize) - if err != nil { - err = msgp.WrapError(err, "ReplicationPendingSize") - return - } - err = en.WriteUint64(z.ReplicationFailedSize) - if err != nil { - err = msgp.WrapError(err, "ReplicationFailedSize") - return - } - err = en.WriteUint64(z.ReplicaSize) - if err != nil { - err = msgp.WrapError(err, "ReplicaSize") - return - } err = en.WriteUint64(z.Objects) if err != nil { err = msgp.WrapError(err, "Objects") @@ -896,9 +1137,9 @@ func (z *dataUsageEntry) EncodeMsg(en *msgp.Writer) (err error) { return } } - err = z.Children.EncodeMsg(en) + err = z.ReplicationStats.EncodeMsg(en) if err != nil { - err = msgp.WrapError(err, "Children") + err = msgp.WrapError(err, "ReplicationStats") return } return @@ -907,21 +1148,22 @@ func (z *dataUsageEntry) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *dataUsageEntry) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 8 - o = append(o, 0x98) + // array header, size 5 + o = append(o, 0x95) + o, err = z.Children.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } o = msgp.AppendInt64(o, z.Size) - o = msgp.AppendUint64(o, z.ReplicatedSize) - o = msgp.AppendUint64(o, z.ReplicationPendingSize) - o = msgp.AppendUint64(o, z.ReplicationFailedSize) - o = msgp.AppendUint64(o, z.ReplicaSize) o = msgp.AppendUint64(o, z.Objects) o = msgp.AppendArrayHeader(o, uint32(dataUsageBucketLen)) for za0001 := range z.ObjSizes { o = msgp.AppendUint64(o, z.ObjSizes[za0001]) } - o, err = z.Children.MarshalMsg(o) + o, err = z.ReplicationStats.MarshalMsg(o) if err != nil { - err = msgp.WrapError(err, "Children") + err = msgp.WrapError(err, "ReplicationStats") return } return @@ -935,8 +1177,13 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 8 { - err = msgp.ArrayError{Wanted: 8, Got: zb0001} + if zb0001 != 5 { + err = msgp.ArrayError{Wanted: 5, Got: zb0001} + return + } + bts, err = z.Children.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Children") return } z.Size, bts, err = msgp.ReadInt64Bytes(bts) @@ -944,26 +1191,6 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Size") return } - z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "ReplicatedSize") - return - } - z.ReplicationPendingSize, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "ReplicationPendingSize") - return - } - z.ReplicationFailedSize, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "ReplicationFailedSize") - return - } - z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "ReplicaSize") - return - } z.Objects, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { err = msgp.WrapError(err, "Objects") @@ -986,9 +1213,9 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } - bts, err = z.Children.UnmarshalMsg(bts) + bts, err = z.ReplicationStats.UnmarshalMsg(bts) if err != nil { - err = msgp.WrapError(err, "Children") + err = msgp.WrapError(err, "ReplicationStats") return } o = bts @@ -997,7 +1224,7 @@ func (z *dataUsageEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *dataUsageEntry) Msgsize() (s int) { - s = 1 + msgp.Int64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.ArrayHeaderSize + (dataUsageBucketLen * (msgp.Uint64Size)) + z.Children.Msgsize() + s = 1 + z.Children.Msgsize() + msgp.Int64Size + msgp.Uint64Size + msgp.ArrayHeaderSize + (dataUsageBucketLen * (msgp.Uint64Size)) + z.ReplicationStats.Msgsize() return } @@ -1158,6 +1385,227 @@ func (z *dataUsageEntryV2) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *dataUsageEntryV3) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 8 { + err = msgp.ArrayError{Wanted: 8, Got: zb0001} + return + } + z.Size, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + z.ReplicatedSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + z.ReplicationPendingSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicationPendingSize") + return + } + z.ReplicationFailedSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicationFailedSize") + return + } + z.ReplicaSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + z.Objects, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Objects") + return + } + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "ObjSizes") + return + } + if zb0002 != uint32(dataUsageBucketLen) { + err = msgp.ArrayError{Wanted: uint32(dataUsageBucketLen), Got: zb0002} + return + } + for za0001 := range z.ObjSizes { + z.ObjSizes[za0001], err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ObjSizes", za0001) + return + } + } + err = z.Children.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *dataUsageEntryV3) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 8 + err = en.Append(0x98) + if err != nil { + return + } + err = en.WriteInt64(z.Size) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + err = en.WriteUint64(z.ReplicatedSize) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + err = en.WriteUint64(z.ReplicationPendingSize) + if err != nil { + err = msgp.WrapError(err, "ReplicationPendingSize") + return + } + err = en.WriteUint64(z.ReplicationFailedSize) + if err != nil { + err = msgp.WrapError(err, "ReplicationFailedSize") + return + } + err = en.WriteUint64(z.ReplicaSize) + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + err = en.WriteUint64(z.Objects) + if err != nil { + err = msgp.WrapError(err, "Objects") + return + } + err = en.WriteArrayHeader(uint32(dataUsageBucketLen)) + if err != nil { + err = msgp.WrapError(err, "ObjSizes") + return + } + for za0001 := range z.ObjSizes { + err = en.WriteUint64(z.ObjSizes[za0001]) + if err != nil { + err = msgp.WrapError(err, "ObjSizes", za0001) + return + } + } + err = z.Children.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *dataUsageEntryV3) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 8 + o = append(o, 0x98) + o = msgp.AppendInt64(o, z.Size) + o = msgp.AppendUint64(o, z.ReplicatedSize) + o = msgp.AppendUint64(o, z.ReplicationPendingSize) + o = msgp.AppendUint64(o, z.ReplicationFailedSize) + o = msgp.AppendUint64(o, z.ReplicaSize) + o = msgp.AppendUint64(o, z.Objects) + o = msgp.AppendArrayHeader(o, uint32(dataUsageBucketLen)) + for za0001 := range z.ObjSizes { + o = msgp.AppendUint64(o, z.ObjSizes[za0001]) + } + o, err = z.Children.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *dataUsageEntryV3) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 8 { + err = msgp.ArrayError{Wanted: 8, Got: zb0001} + return + } + z.Size, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + z.ReplicationPendingSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicationPendingSize") + return + } + z.ReplicationFailedSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicationFailedSize") + return + } + z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + z.Objects, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Objects") + return + } + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjSizes") + return + } + if zb0002 != uint32(dataUsageBucketLen) { + err = msgp.ArrayError{Wanted: uint32(dataUsageBucketLen), Got: zb0002} + return + } + for za0001 := range z.ObjSizes { + z.ObjSizes[za0001], bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjSizes", za0001) + return + } + } + bts, err = z.Children.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Children") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *dataUsageEntryV3) Msgsize() (s int) { + s = 1 + msgp.Int64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.ArrayHeaderSize + (dataUsageBucketLen * (msgp.Uint64Size)) + z.Children.Msgsize() + return +} + // DecodeMsg implements msgp.Decodable func (z *dataUsageHash) DecodeMsg(dc *msgp.Reader) (err error) { { @@ -1210,6 +1658,221 @@ func (z dataUsageHash) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *replicationStats) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 10 { + err = msgp.ArrayError{Wanted: 10, Got: zb0001} + return + } + z.PendingSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "PendingSize") + return + } + z.ReplicatedSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + z.FailedSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + z.ReplicaSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + z.FailedCount, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + z.PendingCount, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "PendingCount") + return + } + z.MissedThresholdSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "MissedThresholdSize") + return + } + z.AfterThresholdSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "AfterThresholdSize") + return + } + z.MissedThresholdCount, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "MissedThresholdCount") + return + } + z.AfterThresholdCount, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "AfterThresholdCount") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *replicationStats) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 10 + err = en.Append(0x9a) + if err != nil { + return + } + err = en.WriteUint64(z.PendingSize) + if err != nil { + err = msgp.WrapError(err, "PendingSize") + return + } + err = en.WriteUint64(z.ReplicatedSize) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + err = en.WriteUint64(z.FailedSize) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + err = en.WriteUint64(z.ReplicaSize) + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + err = en.WriteUint64(z.FailedCount) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + err = en.WriteUint64(z.PendingCount) + if err != nil { + err = msgp.WrapError(err, "PendingCount") + return + } + err = en.WriteUint64(z.MissedThresholdSize) + if err != nil { + err = msgp.WrapError(err, "MissedThresholdSize") + return + } + err = en.WriteUint64(z.AfterThresholdSize) + if err != nil { + err = msgp.WrapError(err, "AfterThresholdSize") + return + } + err = en.WriteUint64(z.MissedThresholdCount) + if err != nil { + err = msgp.WrapError(err, "MissedThresholdCount") + return + } + err = en.WriteUint64(z.AfterThresholdCount) + if err != nil { + err = msgp.WrapError(err, "AfterThresholdCount") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *replicationStats) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 10 + o = append(o, 0x9a) + o = msgp.AppendUint64(o, z.PendingSize) + o = msgp.AppendUint64(o, z.ReplicatedSize) + o = msgp.AppendUint64(o, z.FailedSize) + o = msgp.AppendUint64(o, z.ReplicaSize) + o = msgp.AppendUint64(o, z.FailedCount) + o = msgp.AppendUint64(o, z.PendingCount) + o = msgp.AppendUint64(o, z.MissedThresholdSize) + o = msgp.AppendUint64(o, z.AfterThresholdSize) + o = msgp.AppendUint64(o, z.MissedThresholdCount) + o = msgp.AppendUint64(o, z.AfterThresholdCount) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *replicationStats) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 10 { + err = msgp.ArrayError{Wanted: 10, Got: zb0001} + return + } + z.PendingSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PendingSize") + return + } + z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + z.FailedSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + z.FailedCount, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + z.PendingCount, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PendingCount") + return + } + z.MissedThresholdSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "MissedThresholdSize") + return + } + z.AfterThresholdSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "AfterThresholdSize") + return + } + z.MissedThresholdCount, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "MissedThresholdCount") + return + } + z.AfterThresholdCount, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "AfterThresholdCount") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *replicationStats) Msgsize() (s int) { + s = 1 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + return +} + // DecodeMsg implements msgp.Decodable func (z *sizeHistogram) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 diff --git a/cmd/data-usage-cache_gen_test.go b/cmd/data-usage-cache_gen_test.go index 51d2173b0..635139b3d 100644 --- a/cmd/data-usage-cache_gen_test.go +++ b/cmd/data-usage-cache_gen_test.go @@ -348,6 +348,119 @@ func BenchmarkDecodedataUsageCacheV2(b *testing.B) { } } +func TestMarshalUnmarshaldataUsageCacheV3(t *testing.T) { + v := dataUsageCacheV3{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgdataUsageCacheV3(b *testing.B) { + v := dataUsageCacheV3{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgdataUsageCacheV3(b *testing.B) { + v := dataUsageCacheV3{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaldataUsageCacheV3(b *testing.B) { + v := dataUsageCacheV3{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodedataUsageCacheV3(t *testing.T) { + v := dataUsageCacheV3{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodedataUsageCacheV3 Msgsize() is inaccurate") + } + + vn := dataUsageCacheV3{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodedataUsageCacheV3(b *testing.B) { + v := dataUsageCacheV3{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodedataUsageCacheV3(b *testing.B) { + v := dataUsageCacheV3{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshaldataUsageEntry(t *testing.T) { v := dataUsageEntry{} bts, err := v.MarshalMsg(nil) @@ -574,6 +687,232 @@ func BenchmarkDecodedataUsageEntryV2(b *testing.B) { } } +func TestMarshalUnmarshaldataUsageEntryV3(t *testing.T) { + v := dataUsageEntryV3{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgdataUsageEntryV3(b *testing.B) { + v := dataUsageEntryV3{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgdataUsageEntryV3(b *testing.B) { + v := dataUsageEntryV3{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaldataUsageEntryV3(b *testing.B) { + v := dataUsageEntryV3{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodedataUsageEntryV3(t *testing.T) { + v := dataUsageEntryV3{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodedataUsageEntryV3 Msgsize() is inaccurate") + } + + vn := dataUsageEntryV3{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodedataUsageEntryV3(b *testing.B) { + v := dataUsageEntryV3{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodedataUsageEntryV3(b *testing.B) { + v := dataUsageEntryV3{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalreplicationStats(t *testing.T) { + v := replicationStats{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgreplicationStats(b *testing.B) { + v := replicationStats{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgreplicationStats(b *testing.B) { + v := replicationStats{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalreplicationStats(b *testing.B) { + v := replicationStats{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodereplicationStats(t *testing.T) { + v := replicationStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodereplicationStats Msgsize() is inaccurate") + } + + vn := replicationStats{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodereplicationStats(b *testing.B) { + v := replicationStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodereplicationStats(b *testing.B) { + v := replicationStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalsizeHistogram(t *testing.T) { v := sizeHistogram{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 65696751a..f145867a9 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -25,11 +25,10 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/hash" + "github.com/minio/minio/pkg/madmin" ) const ( - envDataUsageScannerDebug = "MINIO_DISK_USAGE_SCANNER_DEBUG" - dataUsageRoot = SlashSeparator dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix @@ -39,7 +38,7 @@ const ( ) // storeDataUsageInBackend will store all objects sent on the gui channel until closed. -func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan DataUsageInfo) { +func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan madmin.DataUsageInfo) { for dataUsageInfo := range dui { dataUsageJSON, err := json.Marshal(dataUsageInfo) if err != nil { @@ -59,27 +58,27 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan } } -func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) { +func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (madmin.DataUsageInfo, error) { r, err := objAPI.GetObjectNInfo(ctx, dataUsageBucket, dataUsageObjName, nil, http.Header{}, readLock, ObjectOptions{}) if err != nil { if isErrObjectNotFound(err) || isErrBucketNotFound(err) { - return DataUsageInfo{}, nil + return madmin.DataUsageInfo{}, nil } - return DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName) + return madmin.DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName) } defer r.Close() - var dataUsageInfo DataUsageInfo + var dataUsageInfo madmin.DataUsageInfo var json = jsoniter.ConfigCompatibleWithStandardLibrary if err = json.NewDecoder(r).Decode(&dataUsageInfo); err != nil { - return DataUsageInfo{}, err + return madmin.DataUsageInfo{}, err } // For forward compatibility reasons, we need to add this code. if len(dataUsageInfo.BucketsUsage) == 0 { - dataUsageInfo.BucketsUsage = make(map[string]BucketUsageInfo, len(dataUsageInfo.BucketSizes)) + dataUsageInfo.BucketsUsage = make(map[string]madmin.BucketUsageInfo, len(dataUsageInfo.BucketSizes)) for bucket, size := range dataUsageInfo.BucketSizes { - dataUsageInfo.BucketsUsage[bucket] = BucketUsageInfo{Size: size} + dataUsageInfo.BucketsUsage[bucket] = madmin.BucketUsageInfo{Size: size} } } diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index 63639e738..75c80c3f6 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -135,7 +135,8 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve errFileVersionNotFound, errDiskNotFound, }...) { - logger.LogOnceIf(ctx, fmt.Errorf("Drive %s returned an error (%w)", disks[index], err), + logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", + disks[index], bucket, object, err), disks[index].String()) } } diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 9dada5eb1..9d07b8dd0 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -322,8 +322,12 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix return errDiskNotFound } // Pick one FileInfo for a disk at index. - files[index].Erasure.Index = index + 1 - return disks[index].WriteMetadata(ctx, bucket, prefix, files[index]) + fi := files[index] + fi.Erasure.Index = index + 1 + if fi.IsValid() { + return disks[index].WriteMetadata(ctx, bucket, prefix, fi) + } + return errCorruptedFormat }, index) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index d72b36926..6b96c4648 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -72,7 +72,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // Read metadata associated with the object from all disks. storageDisks := er.getDisks() - metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false) + metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true) // get Quorum for this object readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) @@ -1215,7 +1215,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) if err != nil { @@ -1289,7 +1289,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, true) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount) if err != nil { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 72ba0a898..3658d6c1f 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -433,7 +433,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context) (StorageInfo, []er return storageInfo, errs } -func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { +func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -448,7 +448,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd } if len(allBuckets) == 0 { - updates <- DataUsageInfo{} // no buckets found update data usage to reflect latest state + updates <- madmin.DataUsageInfo{} // no buckets found update data usage to reflect latest state return nil } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 1249f7646..ac8393bef 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -237,7 +237,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) { } // NSScanner returns data usage stats of the current FS deployment -func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { +func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error { // Load bucket totals var totalCache dataUsageCache err := totalCache.load(ctx, fs, dataUsageCacheName) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 6fcd7a183..08d8b766d 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -47,7 +47,7 @@ func (a GatewayUnsupported) LocalStorageInfo(ctx context.Context) (StorageInfo, } // NSScanner - scanner is not implemented for gateway -func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { +func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error { logger.CriticalIf(ctx, errors.New("not implemented")) return NotImplemented{} } diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 21bc7c02e..3447a55d7 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -44,7 +44,7 @@ const ( healMetricNamespace MetricNamespace = "minio_heal" interNodeMetricNamespace MetricNamespace = "minio_inter_node" nodeMetricNamespace MetricNamespace = "minio_node" - minIOMetricNamespace MetricNamespace = "minio" + minioMetricNamespace MetricNamespace = "minio" s3MetricNamespace MetricNamespace = "minio_s3" ) @@ -93,9 +93,11 @@ const ( writeTotal MetricName = "write_total" total MetricName = "total" + failedCount MetricName = "failed_count" failedBytes MetricName = "failed_bytes" freeBytes MetricName = "free_bytes" pendingBytes MetricName = "pending_bytes" + pendingCount MetricName = "pending_count" readBytes MetricName = "read_bytes" rcharBytes MetricName = "rchar_bytes" receivedBytes MetricName = "received_bytes" @@ -356,6 +358,16 @@ func getNodeDiskTotalBytesMD() MetricDescription { Type: gaugeMetric, } } +func getUsageLastScanActivityMD() MetricDescription { + return MetricDescription{ + Namespace: minioMetricNamespace, + Subsystem: usageSubsystem, + Name: lastActivityTime, + Help: "Time elapsed (in nano seconds) since last scan activity. This is set to 0 until first scan cycle", + Type: gaugeMetric, + } +} + func getBucketUsageTotalBytesMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -410,6 +422,24 @@ func getBucketRepReceivedBytesMD() MetricDescription { Type: gaugeMetric, } } +func getBucketRepPendingOperationsMD() MetricDescription { + return MetricDescription{ + Namespace: bucketMetricNamespace, + Subsystem: replicationSubsystem, + Name: pendingCount, + Help: "Total number of objects pending replication", + Type: gaugeMetric, + } +} +func getBucketRepFailedOperationsMD() MetricDescription { + return MetricDescription{ + Namespace: bucketMetricNamespace, + Subsystem: replicationSubsystem, + Name: failedCount, + Help: "Total number of objects which failed replication", + Type: gaugeMetric, + } +} func getBucketObjectDistributionMD() MetricDescription { return MetricDescription{ Namespace: bucketMetricNamespace, @@ -666,7 +696,7 @@ func getNodeOfflineTotalMD() MetricDescription { } func getMinIOVersionMD() MetricDescription { return MetricDescription{ - Namespace: minIOMetricNamespace, + Namespace: minioMetricNamespace, Subsystem: softwareSubsystem, Name: versionInfo, Help: "MinIO Release tag for the server", @@ -675,7 +705,7 @@ func getMinIOVersionMD() MetricDescription { } func getMinIOCommitMD() MetricDescription { return MetricDescription{ - Namespace: minIOMetricNamespace, + Namespace: minioMetricNamespace, Subsystem: softwareSubsystem, Name: commitInfo, Help: "Git commit hash for the MinIO release.", @@ -996,13 +1026,14 @@ func getMinioHealingMetrics() MetricsGroup { if !exists { return } - var dur time.Duration - if !bgSeq.lastHealActivity.IsZero() { - dur = time.Since(bgSeq.lastHealActivity) + + if bgSeq.lastHealActivity.IsZero() { + return } + metrics = append(metrics, Metric{ Description: getHealLastActivityTimeMD(), - Value: float64(dur), + Value: float64(time.Since(bgSeq.lastHealActivity)), }) metrics = append(metrics, getObjectsScanned(bgSeq)...) metrics = append(metrics, getScannedItems(bgSeq)...) @@ -1224,7 +1255,14 @@ func getBucketUsageMetrics() MetricsGroup { return } + metrics = append(metrics, Metric{ + Description: getUsageLastScanActivityMD(), + Value: float64(time.Since(dataUsageInfo.LastUpdate)), + }) + for bucket, usage := range dataUsageInfo.BucketsUsage { + stat := getLatestReplicationStats(bucket, usage) + metrics = append(metrics, Metric{ Description: getBucketUsageTotalBytesMD(), Value: float64(usage.Size), @@ -1237,25 +1275,35 @@ func getBucketUsageMetrics() MetricsGroup { VariableLabels: map[string]string{"bucket": bucket}, }) - if usage.hasReplicationUsage() { + if stat.hasReplicationUsage() { metrics = append(metrics, Metric{ Description: getBucketRepPendingBytesMD(), - Value: float64(usage.ReplicationPendingSize), + Value: float64(stat.PendingSize), VariableLabels: map[string]string{"bucket": bucket}, }) metrics = append(metrics, Metric{ Description: getBucketRepFailedBytesMD(), - Value: float64(usage.ReplicationFailedSize), + Value: float64(stat.FailedSize), VariableLabels: map[string]string{"bucket": bucket}, }) metrics = append(metrics, Metric{ Description: getBucketRepSentBytesMD(), - Value: float64(usage.ReplicatedSize), + Value: float64(stat.ReplicatedSize), VariableLabels: map[string]string{"bucket": bucket}, }) metrics = append(metrics, Metric{ Description: getBucketRepReceivedBytesMD(), - Value: float64(usage.ReplicaSize), + Value: float64(stat.ReplicaSize), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getBucketRepPendingOperationsMD(), + Value: float64(stat.PendingCount), + VariableLabels: map[string]string{"bucket": bucket}, + }) + metrics = append(metrics, Metric{ + Description: getBucketRepFailedOperationsMD(), + Value: float64(stat.FailedCount), VariableLabels: map[string]string{"bucket": bucket}, }) } @@ -1372,13 +1420,6 @@ func getClusterStorageMetrics() MetricsGroup { } } -func (b *BucketUsageInfo) hasReplicationUsage() bool { - return b.ReplicationPendingSize > 0 || - b.ReplicationFailedSize > 0 || - b.ReplicatedSize > 0 || - b.ReplicaSize > 0 -} - type minioClusterCollector struct { desc *prometheus.Desc } diff --git a/cmd/metrics.go b/cmd/metrics.go index f4c3a004f..1f86be8c5 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -23,6 +23,7 @@ import ( "time" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/madmin" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -430,6 +431,39 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) { ) } +// get the most current of in-memory replication stats and data usage info from crawler. +func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) BucketReplicationStats { + s := BucketReplicationStats{ + PendingSize: u.ReplicationPendingSize, + FailedSize: u.ReplicationFailedSize, + ReplicatedSize: u.ReplicatedSize, + ReplicaSize: u.ReplicaSize, + PendingCount: u.ReplicationPendingCount, + FailedCount: u.ReplicationFailedCount, + } + rStat := globalReplicationStats.Get(bucket) + // use in memory replication stats if it is ahead of usage info. + if rStat.ReplicatedSize > u.ReplicatedSize { + s.ReplicatedSize = rStat.ReplicatedSize + } + if rStat.PendingSize > u.ReplicationPendingSize { + s.PendingSize = rStat.PendingSize + } + if rStat.FailedSize > u.ReplicationFailedSize { + s.FailedSize = rStat.FailedSize + } + if rStat.ReplicaSize > u.ReplicaSize { + s.ReplicaSize = rStat.ReplicaSize + } + if rStat.PendingCount > u.ReplicationPendingCount { + s.PendingCount = rStat.PendingCount + } + if rStat.FailedCount > u.ReplicationFailedCount { + s.FailedCount = rStat.FailedCount + } + return s +} + // Populates prometheus with bucket usage metrics, this metrics // is only enabled if scanner is enabled. func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { @@ -447,13 +481,13 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { if err != nil { return } - // data usage has not captured any data yet. if dataUsageInfo.LastUpdate.IsZero() { return } for bucket, usageInfo := range dataUsageInfo.BucketsUsage { + stat := getLatestReplicationStats(bucket, usageInfo) // Total space used by bucket ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( @@ -479,7 +513,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { "Total capacity pending to be replicated", []string{"bucket"}, nil), prometheus.GaugeValue, - float64(usageInfo.ReplicationPendingSize), + float64(stat.PendingSize), bucket, ) ch <- prometheus.MustNewConstMetric( @@ -488,7 +522,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { "Total capacity failed to replicate at least once", []string{"bucket"}, nil), prometheus.GaugeValue, - float64(usageInfo.ReplicationFailedSize), + float64(stat.FailedSize), bucket, ) ch <- prometheus.MustNewConstMetric( @@ -497,7 +531,7 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { "Total capacity replicated to destination", []string{"bucket"}, nil), prometheus.GaugeValue, - float64(usageInfo.ReplicatedSize), + float64(stat.ReplicatedSize), bucket, ) ch <- prometheus.MustNewConstMetric( @@ -506,7 +540,25 @@ func bucketUsageMetricsPrometheus(ch chan<- prometheus.Metric) { "Total capacity replicated to this instance", []string{"bucket"}, nil), prometheus.GaugeValue, - float64(usageInfo.ReplicaSize), + float64(stat.ReplicaSize), + bucket, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("bucket", "replication", "pending_count"), + "Total replication operations pending", + []string{"bucket"}, nil), + prometheus.GaugeValue, + float64(stat.PendingCount), + bucket, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName("bucket", "replication", "failed_count"), + "Total replication operations failed", + []string{"bucket"}, nil), + prometheus.GaugeValue, + float64(stat.FailedCount), bucket, ) for k, v := range usageInfo.ObjectSizesHistogram { diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index d2d20612a..3d72f78c4 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -69,57 +69,6 @@ var ObjectsHistogramIntervals = []objectHistogramInterval{ {"GREATER_THAN_512_MB", humanize.MiByte * 512, math.MaxInt64}, } -// BucketUsageInfo - bucket usage info provides -// - total size of the bucket -// - total objects in a bucket -// - object size histogram per bucket -type BucketUsageInfo struct { - Size uint64 `json:"size"` - ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"` - ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"` - ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"` - ReplicaSize uint64 `json:"objectReplicaTotalSize"` - ObjectsCount uint64 `json:"objectsCount"` - ObjectSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"` -} - -// DataUsageInfo represents data usage stats of the underlying Object API -type DataUsageInfo struct { - // LastUpdate is the timestamp of when the data usage info was last updated. - // This does not indicate a full scan. - LastUpdate time.Time `json:"lastUpdate"` - - // Objects total count across all buckets - ObjectsTotalCount uint64 `json:"objectsCount"` - - // Objects total size across all buckets - ObjectsTotalSize uint64 `json:"objectsTotalSize"` - - // Total Size for objects that have not yet been replicated - ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"` - - // Total size for objects that have witness one or more failures and will be retried - ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"` - - // Total size for objects that have been replicated to destination - ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"` - - // Total size for objects that are replicas - ReplicaSize uint64 `json:"objectsReplicaTotalSize"` - - // Total number of buckets in this cluster - BucketsCount uint64 `json:"bucketsCount"` - - // Buckets usage info provides following information across all buckets - // - total size of the bucket - // - total objects in a bucket - // - object size histogram per bucket - BucketsUsage map[string]BucketUsageInfo `json:"bucketsUsageInfo"` - - // Deprecated kept here for backward compatibility reasons. - BucketSizes map[string]uint64 `json:"bucketsSizes"` -} - // BucketInfo - represents bucket metadata. type BucketInfo struct { // Name of the bucket. diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 0598ba0de..469c0b872 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -91,7 +91,7 @@ type ObjectLayer interface { // Storage operations. Shutdown(context.Context) error - NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error + NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- madmin.DataUsageInfo) error BackendInfo() madmin.BackendInfo StorageInfo(ctx context.Context) (StorageInfo, []error) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 44a1a1c54..d7784ab97 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1341,7 +1341,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } setPutObjHeaders(w, objInfo, false) @@ -1656,7 +1656,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } setPutObjHeaders(w, objInfo, false) @@ -1938,7 +1938,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h } if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } } @@ -3014,7 +3014,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite setPutObjHeaders(w, objInfo, false) if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } // Write success response. @@ -3294,7 +3294,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r return } if replicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.MetadataReplicationType) } writeSuccessResponseHeadersOnly(w) @@ -3467,7 +3467,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r return } if replicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.MetadataReplicationType) } writeSuccessNoContent(w) @@ -3650,7 +3650,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h } if replicate { - scheduleReplication(ctx, objInfo.Clone(), objAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objAPI, sync, replication.MetadataReplicationType) } if objInfo.VersionID != "" { @@ -3724,7 +3724,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r } if replicate { - scheduleReplication(ctx, oi.Clone(), objAPI, sync) + scheduleReplication(ctx, oi.Clone(), objAPI, sync, replication.MetadataReplicationType) } if oi.VersionID != "" { diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index f743510f4..98c66456e 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -1336,7 +1336,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } } if mustReplicate { - scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } reqParams := extractReqParams(r) diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 63040dd0b..ac11b98c7 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -231,7 +231,7 @@ type xlMetaV2 struct { Versions []xlMetaV2Version `json:"Versions" msg:"Versions"` // data will contain raw data if any. - // data will be one or more versions indexed by storage dir. + // data will be one or more versions indexed by versionID. // To remove all data set to nil. data xlMetaInlineData `msg:"-"` } @@ -295,28 +295,31 @@ func (x xlMetaInlineData) validate() error { if len(x) == 0 { return nil } + if !x.versionOK() { return fmt.Errorf("xlMetaInlineData: unknown version 0x%x", x[0]) } sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion()) if err != nil { - return err + return fmt.Errorf("xlMetaInlineData: %w", err) } + for i := uint32(0); i < sz; i++ { var key []byte key, buf, err = msgp.ReadMapKeyZC(buf) if err != nil { - return err + return fmt.Errorf("xlMetaInlineData: %w", err) } if len(key) == 0 { return fmt.Errorf("xlMetaInlineData: key %d is length 0", i) } _, buf, err = msgp.ReadBytesZC(buf) if err != nil { - return err + return fmt.Errorf("xlMetaInlineData: %w", err) } } + return nil } @@ -564,31 +567,27 @@ func (z *xlMetaV2) AddLegacy(m *xlMetaV1Object) error { func (z *xlMetaV2) Load(buf []byte) error { buf, _, minor, err := checkXL2V1(buf) if err != nil { - return errFileCorrupt + return fmt.Errorf("z.Load %w", err) } switch minor { case 0: _, err = z.UnmarshalMsg(buf) if err != nil { - return errFileCorrupt + return fmt.Errorf("z.Load %w", err) } return nil case 1: v, buf, err := msgp.ReadBytesZC(buf) if err != nil { - return errFileCorrupt + return fmt.Errorf("z.Load version(%d), bufLen(%d) %w", minor, len(buf), err) } - _, err = z.UnmarshalMsg(v) - if err != nil { - return errFileCorrupt + if _, err = z.UnmarshalMsg(v); err != nil { + return fmt.Errorf("z.Load version(%d), vLen(%d), %w", minor, len(v), err) } // Add remaining data. - z.data = nil - if len(buf) > 0 { - z.data = buf - if err := z.data.validate(); err != nil { - return errFileCorrupt - } + z.data = buf + if err = z.data.validate(); err != nil { + return fmt.Errorf("z.Load version(%d), bufLen(%d) %w", minor, len(buf), err) } default: return errors.New("unknown metadata version") diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index dec84fdee..7af109167 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -911,29 +911,28 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F logger.LogIf(ctx, err) return err } + buf, err = xlMeta.AppendTo(nil) if err != nil { logger.LogIf(ctx, err) return err } - if err := xlMeta.Load(buf); err != nil { - panic(err) - } } else { if err = xlMeta.Load(buf); err != nil { logger.LogIf(ctx, err) return err } + if err = xlMeta.AddVersion(fi); err != nil { logger.LogIf(ctx, err) return err } + buf, err = xlMeta.AppendTo(nil) if err != nil { logger.LogIf(ctx, err) return err } - } return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) @@ -1042,11 +1041,11 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str if len(fi.Data) > 0 || fi.Size == 0 { return fi, nil } + // Reading data for small objects when // - object has not yet transitioned // - object size lesser than 32KiB // - object has maximum of 1 parts - if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 { // Enable O_DIRECT optionally only if drive supports it. requireDirectIO := globalStorageClass.GetDMA() == storageclass.DMAReadWrite diff --git a/docs/metrics/prometheus/list.md b/docs/metrics/prometheus/list.md index 1a1f6d2e0..c2c0bfff8 100644 --- a/docs/metrics/prometheus/list.md +++ b/docs/metrics/prometheus/list.md @@ -12,6 +12,7 @@ These metrics can be from any MinIO server once per collection. |`minio_bucket_replication_pending_bytes` |Total bytes pending to replicate. | |`minio_bucket_replication_received_bytes` |Total number of bytes replicated to this bucket from another source bucket. | |`minio_bucket_replication_sent_bytes` |Total number of bytes replicated to the target bucket. | +|`minio_bucket_replication_pending_count` |Total number of replication operations pending for this bucket. | |`minio_bucket_usage_object_total` |Total number of objects | |`minio_bucket_usage_total_bytes` |Total bucket size in bytes | |`minio_cache_hits_total` |Total number of disk cache hits | diff --git a/pkg/madmin/README.md b/pkg/madmin/README.md index 69afb402a..a5d28c1cd 100644 --- a/pkg/madmin/README.md +++ b/pkg/madmin/README.md @@ -258,15 +258,15 @@ Fetches accounting usage information for the current authenticated user | Param | Type | Description | |--------------------------------|----------------------|-------------------------| | `AccountInfo.AccountName` | _string_ | Account name. | -| `AccountInfo.Buckets` | _[]BucketUsageInfo_ | Bucket usage info. | +| `AccountInfo.Buckets` | _[]BucketAccessInfo_ | Bucket usage info. | | Param | Type | Description | |----------------------------|-----------------|-----------------------------------------| -| `BucketUsageInfo.Name` | _string_ | The name of the current bucket -| `BucketUsageInfo.Size` | _uint64_ | The total size of the current bucket -| `BucketUsageInfo.Created` | _time.Time_ | Bucket creation time -| `BucketUsageInfo.Access` | _AccountAccess_ | Type of access of the current account +| `BucketAccessInfo.Name` | _string_ | The name of the current bucket +| `BucketAccessInfo.Size` | _uint64_ | The total size of the current bucket +| `BucketAccessInfo.Created` | _time.Time_ | Bucket creation time +| `BucketAccessInfo.Access` | _AccountAccess_ | Type of access of the current account | Param | Type | Description | diff --git a/pkg/madmin/info-commands.go b/pkg/madmin/info-commands.go index f37bc1eb1..b6a7970af 100644 --- a/pkg/madmin/info-commands.go +++ b/pkg/madmin/info-commands.go @@ -20,7 +20,6 @@ package madmin import ( "context" "encoding/json" - "io/ioutil" "net/http" "time" ) @@ -124,36 +123,71 @@ func (adm *AdminClient) StorageInfo(ctx context.Context) (StorageInfo, error) { // Unmarshal the server's json response var storageInfo StorageInfo - - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return StorageInfo{}, err - } - - err = json.Unmarshal(respBytes, &storageInfo) - if err != nil { + if err = json.NewDecoder(resp.Body).Decode(&storageInfo); err != nil { return StorageInfo{}, err } return storageInfo, nil } -// DataUsageInfo represents data usage of an Object API +// BucketUsageInfo - bucket usage info provides +// - total size of the bucket +// - total objects in a bucket +// - object size histogram per bucket +type BucketUsageInfo struct { + Size uint64 `json:"size"` + ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"` + ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"` + ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"` + ReplicaSize uint64 `json:"objectReplicaTotalSize"` + ReplicationPendingCount uint64 `json:"objectsPendingReplicationCount"` + ReplicationFailedCount uint64 `json:"objectsFailedReplicationCount"` + + ObjectsCount uint64 `json:"objectsCount"` + ObjectSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"` +} + +// DataUsageInfo represents data usage stats of the underlying Object API type DataUsageInfo struct { // LastUpdate is the timestamp of when the data usage info was last updated. // This does not indicate a full scan. - LastUpdate time.Time `json:"lastUpdate"` - ObjectsCount uint64 `json:"objectsCount"` - ObjectsTotalSize uint64 `json:"objectsTotalSize"` + LastUpdate time.Time `json:"lastUpdate"` - // ObjectsSizesHistogram contains information on objects across all buckets. - // See ObjectsHistogramIntervals. - ObjectsSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"` + // Objects total count across all buckets + ObjectsTotalCount uint64 `json:"objectsCount"` + // Objects total size across all buckets + ObjectsTotalSize uint64 `json:"objectsTotalSize"` + + // Total Size for objects that have not yet been replicated + ReplicationPendingSize uint64 `json:"objectsPendingReplicationTotalSize"` + + // Total size for objects that have witness one or more failures and will be retried + ReplicationFailedSize uint64 `json:"objectsFailedReplicationTotalSize"` + + // Total size for objects that have been replicated to destination + ReplicatedSize uint64 `json:"objectsReplicatedTotalSize"` + + // Total size for objects that are replicas + ReplicaSize uint64 `json:"objectsReplicaTotalSize"` + + // Total number of objects pending replication + ReplicationPendingCount uint64 `json:"objectsPendingReplicationCount"` + + // Total number of objects that failed replication + ReplicationFailedCount uint64 `json:"objectsFailedReplicationCount"` + + // Total number of buckets in this cluster BucketsCount uint64 `json:"bucketsCount"` - // BucketsSizes is "bucket name" -> size. - BucketsSizes map[string]uint64 `json:"bucketsSizes"` + // Buckets usage info provides following information across all buckets + // - total size of the bucket + // - total objects in a bucket + // - object size histogram per bucket + BucketsUsage map[string]BucketUsageInfo `json:"bucketsUsageInfo"` + + // Deprecated kept here for backward compatibility reasons. + BucketSizes map[string]uint64 `json:"bucketsSizes"` } // DataUsageInfo - returns data usage of the current object API @@ -171,14 +205,7 @@ func (adm *AdminClient) DataUsageInfo(ctx context.Context) (DataUsageInfo, error // Unmarshal the server's json response var dataUsageInfo DataUsageInfo - - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return DataUsageInfo{}, err - } - - err = json.Unmarshal(respBytes, &dataUsageInfo) - if err != nil { + if err = json.NewDecoder(resp.Body).Decode(&dataUsageInfo); err != nil { return DataUsageInfo{}, err } @@ -344,14 +371,7 @@ func (adm *AdminClient) ServerInfo(ctx context.Context) (InfoMessage, error) { // Unmarshal the server's json response var message InfoMessage - - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return InfoMessage{}, err - } - - err = json.Unmarshal(respBytes, &message) - if err != nil { + if err = json.NewDecoder(resp.Body).Decode(&message); err != nil { return InfoMessage{}, err } diff --git a/pkg/madmin/update-commands.go b/pkg/madmin/update-commands.go index 0139af8ba..adee12d08 100644 --- a/pkg/madmin/update-commands.go +++ b/pkg/madmin/update-commands.go @@ -20,7 +20,6 @@ package madmin import ( "context" "encoding/json" - "io/ioutil" "net/http" "net/url" ) @@ -53,10 +52,9 @@ func (adm *AdminClient) ServerUpdate(ctx context.Context, updateURL string) (us return us, httpRespToErrorResponse(resp) } - buf, err := ioutil.ReadAll(resp.Body) - if err != nil { + if err = json.NewDecoder(resp.Body).Decode(&us); err != nil { return us, err } - err = json.Unmarshal(buf, &us) - return us, err + + return us, nil } diff --git a/pkg/madmin/user-commands.go b/pkg/madmin/user-commands.go index adc5796a8..ffe67997d 100644 --- a/pkg/madmin/user-commands.go +++ b/pkg/madmin/user-commands.go @@ -35,9 +35,9 @@ type AccountAccess struct { Write bool `json:"write"` } -// BucketUsageInfo represents bucket usage of a bucket, and its relevant +// BucketAccessInfo represents bucket usage of a bucket, and its relevant // access type for an account -type BucketUsageInfo struct { +type BucketAccessInfo struct { Name string `json:"name"` Size uint64 `json:"size"` Created time.Time `json:"created"` @@ -49,7 +49,7 @@ type BucketUsageInfo struct { type AccountInfo struct { AccountName string Policy iampolicy.Policy - Buckets []BucketUsageInfo + Buckets []BucketAccessInfo } // AccountInfo returns the usage info for the authenticating account.