diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 740f2c4db..627702154 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -857,6 +857,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre ri := logger.GetReqInfo(ctx) if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") { opts.discardResult = true + opts.Transient = true } merged, err := z.listPath(ctx, opts) @@ -1214,6 +1215,31 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo return nil } +// deleteAll will delete a bucket+prefix unconditionally across all disks. +// Note that set distribution is ignored so it should only be used in cases where +// data is not distributed across sets. +// Errors are logged but individual disk failures are not returned. +func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) { + for _, servers := range z.serverPools { + for _, set := range servers.sets { + set.deleteAll(ctx, bucket, prefix) + } + } +} + +// renameAll will rename bucket+prefix unconditionally across all disks to +// minioMetaTmpBucket + unique uuid, +// Note that set distribution is ignored so it should only be used in cases where +// data is not distributed across sets. Errors are logged but individual +// disk failures are not returned. +func (z *erasureServerPools) renameAll(ctx context.Context, bucket, prefix string) { + for _, servers := range z.serverPools { + for _, set := range servers.sets { + set.renameAll(ctx, bucket, prefix) + } + } +} + // This function is used to undo a successful DeleteBucket operation. func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools []*erasureSets, errs []error) { g := errgroup.WithNErrs(len(serverPools)) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 0a807baef..ef81e0ab6 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -29,7 +29,6 @@ import ( "time" "github.com/dchest/siphash" - "github.com/dustin/go-humanize" "github.com/google/uuid" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" @@ -38,7 +37,6 @@ import ( "github.com/minio/minio/pkg/console" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" - "github.com/minio/minio/pkg/objcache" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -368,11 +366,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto // setCount * setDriveCount with each memory upto blockSizeV1. bp := bpool.NewBytePoolCap(n, blockSizeV1, blockSizeV1*2) - mcache, err := objcache.New(1*humanize.GiByte, objcache.DefaultExpiry) - if err != nil { - return nil, err - } - for i := 0; i < setCount; i++ { s.erasureDisks[i] = make([]StorageAPI, setDriveCount) } @@ -419,7 +412,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto getEndpoints: s.GetEndpoints(i), nsMutex: mutex, bp: bp, - metaCache: mcache, mrfOpCh: make(chan partialOperation, 10000), } } diff --git a/cmd/erasure.go b/cmd/erasure.go index 63e2dec5f..24784316d 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -30,7 +30,6 @@ import ( "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" - "github.com/minio/minio/pkg/objcache" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -71,9 +70,6 @@ type erasureObjects struct { // Byte pools used for temporary i/o buffers. bp *bpool.BytePoolCap - // holds current list cache. - metaCache *objcache.Cache - mrfOpCh chan partialOperation } diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index 331625d69..44d00ae2a 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -17,6 +17,11 @@ package cmd import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" "path" "runtime/debug" "sort" @@ -24,8 +29,11 @@ import ( "sync" "time" + "github.com/klauspost/compress/s2" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/console" + "github.com/minio/minio/pkg/hash" + "github.com/tinylib/msgp/msgp" ) //go:generate msgp -file $GOFILE -unexported @@ -49,7 +57,16 @@ type bucketMetacache struct { // newBucketMetacache creates a new bucketMetacache. // Optionally remove all existing caches. -func newBucketMetacache(bucket string) *bucketMetacache { +func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache { + if cleanup { + // Recursively delete all caches. + objAPI := newObjectLayerFn() + ez, ok := objAPI.(*erasureServerPools) + if ok { + ctx := context.Background() + ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator)) + } + } return &bucketMetacache{ bucket: bucket, caches: make(map[string]metacache, 10), @@ -63,6 +80,111 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) { } } +// loadBucketMetaCache will load the cache from the object layer. +// If the cache cannot be found a new one is created. +func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) { + objAPI := newObjectLayerFn() + for objAPI == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + time.Sleep(250 * time.Millisecond) + } + objAPI = newObjectLayerFn() + if objAPI == nil { + logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket)) + } + } + + var meta bucketMetacache + var decErr error + // Use global context for this. + r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{}) + if err == nil { + dec := s2DecPool.Get().(*s2.Reader) + dec.Reset(r) + decErr = meta.DecodeMsg(msgp.NewReader(dec)) + dec.Reset(nil) + r.Close() + s2DecPool.Put(dec) + } + if err != nil { + switch err.(type) { + case ObjectNotFound: + err = nil + case InsufficientReadQuorum: + // Cache is likely lost. Clean up and return new. + return newBucketMetacache(bucket, true), nil + default: + logger.LogIf(ctx, err) + } + return newBucketMetacache(bucket, false), err + } + if decErr != nil { + if errors.Is(err, context.Canceled) { + return newBucketMetacache(bucket, false), err + } + // Log the error, but assume the data is lost and return a fresh bucket. + // Otherwise a broken cache will never recover. + logger.LogIf(ctx, decErr) + return newBucketMetacache(bucket, true), nil + } + // Sanity check... + if meta.bucket != bucket { + logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket) + return newBucketMetacache(bucket, true), nil + } + meta.cachesRoot = make(map[string][]string, len(meta.caches)/10) + // Index roots + for id, cache := range meta.caches { + meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id) + } + return &meta, nil +} + +// save the bucket cache to the object storage. +func (b *bucketMetacache) save(ctx context.Context) error { + if b.transient { + return nil + } + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + + // Keep lock while we marshal. + // We need a write lock since we update 'updated' + b.mu.Lock() + if !b.updated { + b.mu.Unlock() + return nil + } + // Save as s2 compressed msgpack + tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize())) + enc := s2.NewWriter(tmp) + err := msgp.Encode(enc, b) + if err != nil { + b.mu.Unlock() + return err + } + err = enc.Close() + if err != nil { + b.mu.Unlock() + return err + } + b.updated = false + b.mu.Unlock() + + hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len())) + if err != nil { + return err + } + _, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{}) + logger.LogIf(ctx, err) + return err +} + // findCache will attempt to find a matching cache for the provided options. // If a cache with the same ID exists already it will be returned. // If none can be found a new is created with the provided ID. @@ -326,6 +448,41 @@ func (b *bucketMetacache) getCache(id string) *metacache { return &c } +// deleteAll will delete all on disk data for ALL caches. +// Deletes are performed concurrently. +func (b *bucketMetacache) deleteAll() { + ctx := context.Background() + ez, ok := newObjectLayerFn().(*erasureServerPools) + if !ok { + logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasurePools")) + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + b.updated = true + if !b.transient { + // Delete all. + ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator)) + b.caches = make(map[string]metacache, 10) + b.cachesRoot = make(map[string][]string, 10) + return + } + + // Transient are in different buckets. + var wg sync.WaitGroup + for id := range b.caches { + wg.Add(1) + go func(cache metacache) { + defer wg.Done() + ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id)) + }(b.caches[id]) + } + wg.Wait() + b.caches = make(map[string]metacache, 10) +} + // deleteCache will delete a specific cache and all files related to it across the cluster. func (b *bucketMetacache) deleteCache(id string) { b.mu.Lock() @@ -344,4 +501,7 @@ func (b *bucketMetacache) deleteCache(id string) { b.updated = true } b.mu.Unlock() + if ok { + c.delete(context.Background()) + } } diff --git a/cmd/metacache-bucket_test.go b/cmd/metacache-bucket_test.go index a58057d09..1f157a4f2 100644 --- a/cmd/metacache-bucket_test.go +++ b/cmd/metacache-bucket_test.go @@ -6,7 +6,7 @@ import ( ) func Benchmark_bucketMetacache_findCache(b *testing.B) { - bm := newBucketMetacache("") + bm := newBucketMetacache("", false) const elements = 50000 const paths = 100 if elements%paths != 0 { diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index 82276a12a..480b06570 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "fmt" "runtime/debug" "sync" "time" @@ -41,13 +42,25 @@ type metacacheManager struct { trash map[string]metacache // Recently deleted lists. } +const metacacheManagerTransientBucket = "**transient**" const metacacheMaxEntries = 5000 // initManager will start async saving the cache. func (m *metacacheManager) initManager() { + // Add a transient bucket. + tb := newBucketMetacache(metacacheManagerTransientBucket, false) + tb.transient = true + m.buckets[metacacheManagerTransientBucket] = tb + // Start saver when object layer is ready. go func() { + objAPI := newObjectLayerFn() + for objAPI == nil { + time.Sleep(time.Second) + objAPI = newObjectLayerFn() + } if !globalIsErasure { + logger.Info("metacacheManager was initialized in non-erasure mode, skipping save") return } @@ -55,6 +68,7 @@ func (m *metacacheManager) initManager() { defer t.Stop() var exit bool + bg := context.Background() for !exit { select { case <-t.C: @@ -66,11 +80,13 @@ func (m *metacacheManager) initManager() { if !exit { v.cleanup() } + logger.LogIf(bg, v.save(bg)) } m.mu.RUnlock() m.mu.Lock() for k, v := range m.trash { if time.Since(v.lastUpdate) > metacacheMaxRunningAge { + v.delete(context.Background()) delete(m.trash, k) } } @@ -81,6 +97,9 @@ func (m *metacacheManager) initManager() { // findCache will get a metacache. func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache { + if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) { + return m.getTransient().findCache(o) + } m.mu.RLock() b, ok := m.buckets[o.Bucket] if ok { @@ -117,6 +136,10 @@ func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache { m.init.Do(m.initManager) + // Return a transient bucket for invalid or system buckets. + if isReservedOrInvalidBucket(bucket, false) { + return m.getTransient() + } m.mu.RLock() b, ok := m.buckets[bucket] m.mu.RUnlock() @@ -140,7 +163,16 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket return b } - b = newBucketMetacache(bucket) + // Load bucket. If we fail return the transient bucket. + b, err := loadBucketMetaCache(ctx, bucket) + if err != nil { + m.mu.Unlock() + logger.LogIf(ctx, err) + return m.getTransient() + } + if b.bucket != bucket { + logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket)) + } m.buckets[bucket] = b m.mu.Unlock() return b @@ -163,6 +195,10 @@ func (m *metacacheManager) deleteBucketCache(bucket string) { b.mu.Lock() defer b.mu.Unlock() for k, v := range b.caches { + if time.Since(v.lastUpdate) > metacacheMaxRunningAge { + v.delete(context.Background()) + continue + } v.error = "Bucket deleted" v.status = scanStateError m.mu.Lock() @@ -176,7 +212,59 @@ func (m *metacacheManager) deleteAll() { m.init.Do(m.initManager) m.mu.Lock() defer m.mu.Unlock() - for bucket := range m.buckets { - delete(m.buckets, bucket) + for bucket, b := range m.buckets { + b.deleteAll() + if !b.transient { + delete(m.buckets, bucket) + } } } + +// getTransient will return a transient bucket. +func (m *metacacheManager) getTransient() *bucketMetacache { + m.init.Do(m.initManager) + m.mu.RLock() + bmc := m.buckets[metacacheManagerTransientBucket] + m.mu.RUnlock() + return bmc +} + +// checkMetacacheState should be used if data is not updating. +// Should only be called if a failure occurred. +func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error { + // We operate on a copy... + o.Create = false + var cache metacache + if rpc == nil || o.Transient { + cache = localMetacacheMgr.findCache(ctx, o) + } else { + c, err := rpc.GetMetacacheListing(ctx, o) + if err != nil { + return err + } + cache = *c + } + + if cache.status == scanStateNone || cache.fileNotFound { + return errFileNotFound + } + if cache.status == scanStateSuccess || cache.status == scanStateStarted { + if time.Since(cache.lastUpdate) > metacacheMaxRunningAge { + // We got a stale entry, mark error on handling server. + err := fmt.Errorf("timeout: list %s not updated", cache.id) + cache.error = err.Error() + cache.status = scanStateError + if rpc == nil || o.Transient { + localMetacacheMgr.updateCacheEntry(cache) + } else { + rpc.UpdateMetacacheListing(ctx, cache) + } + return err + } + return nil + } + if cache.error != "" { + return fmt.Errorf("async cache listing failed with: %s", cache.error) + } + return nil +} diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 553f4c95c..6dea435c7 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -116,6 +116,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e // will be generated due to the marker without ID and this check failing. if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive { o.discardResult = true + o.Transient = true } var cache metacache @@ -126,13 +127,12 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e var cache metacache rpc := globalNotificationSys.restClientFromHash(o.Bucket) if isReservedOrInvalidBucket(o.Bucket, false) { - // discard all list caches for reserved buckets. - o.discardResult = true rpc = nil + o.Transient = true } // Apply prefix filter if enabled. o.SetFilter() - if rpc == nil { + if rpc == nil || o.Transient { // Local cache = localMetacacheMgr.findCache(ctx, o) } else { @@ -148,6 +148,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e if !errors.Is(err, context.DeadlineExceeded) { logger.LogIf(ctx, err) } + o.Transient = true cache = localMetacacheMgr.findCache(ctx, o) } else { cache = *c diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 207d699d1..4f42e42cd 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -17,18 +17,23 @@ package cmd import ( + "bytes" "context" "encoding/gob" + "encoding/json" "errors" "fmt" "io" + "strconv" "strings" "sync" "time" + jsoniter "github.com/json-iterator/go" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/console" + "github.com/minio/minio/pkg/hash" ) type listPathOptions struct { @@ -86,6 +91,11 @@ type listPathOptions struct { // Include pure directories. IncludeDirectories bool + // Transient is set if the cache is transient due to an error or being a reserved bucket. + // This means the cache metadata will not be persisted on disk. + // A transient result will never be returned from the cache so knowing the list id is required. + Transient bool + // discardResult will not persist the cache to storage. // When the initial results are returned listing will be canceled. discardResult bool @@ -186,14 +196,74 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa } } +// findFirstPart will find the part with 0 being the first that corresponds to the marker in the options. +// io.ErrUnexpectedEOF is returned if the place containing the marker hasn't been scanned yet. +// io.EOF indicates the marker is beyond the end of the stream and does not exist. +func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) { + search := o.Marker + if search == "" { + search = o.Prefix + } + if search == "" { + return 0, nil + } + o.debugln("searching for ", search) + var tmp metacacheBlock + var json = jsoniter.ConfigCompatibleWithStandardLibrary + i := 0 + for { + partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, i) + v, ok := fi.Metadata[partKey] + if !ok { + o.debugln("no match in metadata, waiting") + return -1, io.ErrUnexpectedEOF + } + err := json.Unmarshal([]byte(v), &tmp) + if !ok { + logger.LogIf(context.Background(), err) + return -1, err + } + if tmp.First == "" && tmp.Last == "" && tmp.EOS { + return 0, errFileNotFound + } + if tmp.First >= search { + o.debugln("First >= search", v) + return i, nil + } + if tmp.Last >= search { + o.debugln("Last >= search", v) + return i, nil + } + if tmp.EOS { + o.debugln("no match, at EOS", v) + return -3, io.EOF + } + o.debugln("First ", tmp.First, "<", search, " search", i) + i++ + } +} + // updateMetacacheListing will update the metacache listing. func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) { + if o.Transient { + return localMetacacheMgr.getTransient().updateCacheEntry(m) + } if rpc == nil { return localMetacacheMgr.updateCacheEntry(m) } return rpc.UpdateMetacacheListing(context.Background(), m) } +func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) { + var tmp metacacheBlock + partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, block) + v, ok := fi.Metadata[partKey] + if !ok { + return nil, io.ErrUnexpectedEOF + } + return &tmp, json.Unmarshal([]byte(v), &tmp) +} + const metacachePrefix = ".metacache" func metacachePrefixForID(bucket, id string) string { @@ -201,8 +271,8 @@ func metacachePrefixForID(bucket, id string) string { } // objectPath returns the object path of the cache. -func (o *listPathOptions) objectPath() string { - return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block.s2") +func (o *listPathOptions) objectPath(block int) string { + return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2") } func (o *listPathOptions) SetFilter() { @@ -287,33 +357,187 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor } func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { - r, err := er.metaCache.Open(pathJoin(minioMetaBucket, o.objectPath()), time.Now().Add(-time.Hour)) - if err != nil { - return entries, io.EOF - } + retries := 0 + rpc := globalNotificationSys.restClientFromHash(o.Bucket) - tmp, err := newMetacacheReader(r) - if err != nil { - return entries, err - } + for { + select { + case <-ctx.Done(): + return entries, ctx.Err() + default: + } - e, err := tmp.filter(o) - entries.o = append(entries.o, e.o...) - if o.Limit > 0 && entries.len() > o.Limit { - entries.truncate(o.Limit) - return entries, nil - } + // If many failures, check the cache state. + if retries > 10 { + err := o.checkMetacacheState(ctx, rpc) + if err != nil { + return entries, fmt.Errorf("remote listing canceled: %w", err) + } + retries = 1 + } - if err == nil { - // We stopped within the listing, we are done for now... - return entries, nil - } + const retryDelay = 500 * time.Millisecond + // Load first part metadata... + // All operations are performed without locks, so we must be careful and allow for failures. + // Read metadata associated with the object from a disk. + if retries > 0 { + disks := er.getOnlineDisks() + if len(disks) == 0 { + time.Sleep(retryDelay) + retries++ + continue + } - if !errors.Is(err, io.EOF) { - logger.LogIf(ctx, err) - } + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) + if err != nil { + time.Sleep(retryDelay) + retries++ + continue + } + } - return entries, err + // Read metadata associated with the object from all disks. + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true) + if err != nil { + switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) { + case ObjectNotFound: + retries++ + time.Sleep(retryDelay) + continue + case InsufficientReadQuorum: + retries++ + time.Sleep(retryDelay) + continue + default: + return entries, fmt.Errorf("reading first part metadata: %w", err) + } + } + + partN, err := o.findFirstPart(fi) + switch { + case err == nil: + case errors.Is(err, io.ErrUnexpectedEOF): + if retries == 10 { + err := o.checkMetacacheState(ctx, rpc) + if err != nil { + return entries, fmt.Errorf("remote listing canceled: %w", err) + } + retries = -1 + } + retries++ + time.Sleep(retryDelay) + continue + case errors.Is(err, io.EOF): + return entries, io.EOF + } + + // We got a stream to start at. + loadedPart := 0 + buf := bufferPool.Get().(*bytes.Buffer) + defer func() { + buf.Reset() + bufferPool.Put(buf) + }() + for { + select { + case <-ctx.Done(): + return entries, ctx.Err() + default: + } + + if partN != loadedPart { + if retries > 10 { + err := o.checkMetacacheState(ctx, rpc) + if err != nil { + return entries, fmt.Errorf("waiting for next part %d: %w", partN, err) + } + retries = 1 + } + + if retries > 0 { + // Load from one disk only + disks := er.getOnlineDisks() + if len(disks) == 0 { + time.Sleep(retryDelay) + retries++ + continue + } + + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) + if err != nil { + time.Sleep(retryDelay) + retries++ + continue + } + } + // Load first part metadata... + fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) + if err != nil { + time.Sleep(retryDelay) + retries++ + continue + } + loadedPart = partN + bi, err := getMetacacheBlockInfo(fi, partN) + logger.LogIf(ctx, err) + if err == nil { + if bi.pastPrefix(o.Prefix) { + return entries, io.EOF + } + } + } + buf.Reset() + err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks) + if err != nil { + switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) { + case ObjectNotFound: + retries++ + time.Sleep(retryDelay) + continue + case InsufficientReadQuorum: + retries++ + time.Sleep(retryDelay) + continue + default: + logger.LogIf(ctx, err) + return entries, err + } + } + tmp, err := newMetacacheReader(buf) + if err != nil { + return entries, err + } + e, err := tmp.filter(o) + entries.o = append(entries.o, e.o...) + if o.Limit > 0 && entries.len() > o.Limit { + entries.truncate(o.Limit) + return entries, nil + } + if err == nil { + // We stopped within the listing, we are done for now... + return entries, nil + } + if !errors.Is(err, io.EOF) { + logger.LogIf(ctx, err) + return entries, err + } + + // We finished at the end of the block. + // And should not expect any more results. + bi, err := getMetacacheBlockInfo(fi, partN) + logger.LogIf(ctx, err) + if err != nil || bi.EOS { + // We are done and there are no more parts. + return entries, io.EOF + } + if bi.endedPrefix(o.Prefix) { + // Nothing more for prefix. + return entries, io.EOF + } + partN++ + retries = 0 + } + } } // Will return io.EOF if continuing would not yield more results. @@ -434,14 +658,28 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr } }() - wc := er.metaCache.Create(pathJoin(minioMetaBucket, o.objectPath())) + const retryDelay = 200 * time.Millisecond + const maxTries = 5 var bw *metacacheBlockWriter // Don't save single object listings. if !o.discardResult { // Write results to disk. bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { - n, err := wc.Write(b.data) + // if the block is 0 bytes and its a first block skip it. + // skip only this for Transient caches. + if len(b.data) == 0 && b.n == 0 && o.Transient { + return nil + } + o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n)) + r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) + logger.LogIf(ctx, err) + custom := b.headerKV() + _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{ + UserDefined: custom, + NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. + ParentIsObject: nil, + }) if err != nil { metaMu.Lock() if meta.error != "" { @@ -452,17 +690,29 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr cancel() return err } - if n != len(b.data) { - metaMu.Lock() - if meta.error != "" { - meta.status = scanStateError - meta.error = io.ErrShortWrite.Error() - } - metaMu.Unlock() - cancel() - return io.ErrShortWrite + if b.n == 0 { + return nil + } + // Update block 0 metadata. + var retries int + for { + err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{}) + if err == nil { + break + } + switch err.(type) { + case ObjectNotFound: + return err + case InsufficientReadQuorum: + default: + logger.LogIf(ctx, err) + } + if retries >= maxTries { + return err + } + retries++ + time.Sleep(retryDelay) } - o.debugln(color.Green("listPath:")+" saving block to", o.objectPath()) return nil }) } @@ -522,13 +772,6 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr meta, _ = o.updateMetacacheListing(meta, rpc) metaMu.Unlock() } - if err := wc.Close(); err != nil { - metaMu.Lock() - meta.error = err.Error() - meta.status = scanStateError - meta, _ = o.updateMetacacheListing(meta, rpc) - metaMu.Unlock() - } } }() diff --git a/cmd/metacache.go b/cmd/metacache.go index 5d268c621..9f0120b51 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -17,9 +17,14 @@ package cmd import ( + "context" + "errors" + "fmt" "path" "strings" "time" + + "github.com/minio/minio/cmd/logger" ) type scanStatus uint8 @@ -226,3 +231,21 @@ func (m *metacache) update(update metacache) { } m.fileNotFound = m.fileNotFound || update.fileNotFound } + +// delete all cache data on disks. +func (m *metacache) delete(ctx context.Context) { + if m.bucket == "" || m.id == "" { + logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id)) + } + objAPI := newObjectLayerFn() + if objAPI == nil { + logger.LogIf(ctx, errors.New("metacache.delete: no object layer")) + return + } + ez, ok := objAPI.(*erasureServerPools) + if !ok { + logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools")) + return + } + ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id)) +} diff --git a/go.mod b/go.mod index d5108188b..26f744b79 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/prometheus/client_golang v1.8.0 github.com/prometheus/client_model v0.2.0 github.com/prometheus/procfs v0.2.0 + github.com/quasilyte/go-ruleguard v0.2.1 // indirect github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.7.0 github.com/secure-io/sio-go v0.3.1 @@ -80,7 +81,7 @@ require ( github.com/tidwall/gjson v1.6.7 github.com/tidwall/sjson v1.0.4 github.com/tinylib/msgp v1.1.3 - github.com/valyala/bytebufferpool v1.0.0 + github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/willf/bitset v1.1.11 // indirect github.com/willf/bloom v2.0.3+incompatible diff --git a/go.sum b/go.sum index 4341a130f..765f0b184 100644 --- a/go.sum +++ b/go.sum @@ -527,6 +527,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo= +github.com/quasilyte/go-ruleguard v0.2.1/go.mod h1:hN2rVc/uS4bQhQKTio2XaSJSafJwqBUWWwtssT3cQmc= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -595,11 +597,11 @@ github.com/tinylib/msgp v1.1.3 h1:3giwAkmtaEDLSV0MdO1lDLuPgklgPzmk8H9+So2BVfA= github.com/tinylib/msgp v1.1.3/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= @@ -612,6 +614,7 @@ github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= @@ -703,6 +706,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -774,6 +778,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go deleted file mode 100644 index fb0e145c3..000000000 --- a/pkg/objcache/objcache.go +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2021 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package objcache implements in memory caching methods. -package objcache - -import ( - "bytes" - "errors" - "io" - "sync" - "time" - - "github.com/valyala/bytebufferpool" -) - -const ( - // NoExpiry represents caches to be permanent and can only be deleted. - NoExpiry = time.Duration(0) - - // DefaultExpiry represents 1 hour time duration when all entries shall be expired. - DefaultExpiry = time.Hour - - // defaultBufferRatio represents default ratio used to calculate the - // individual cache entry buffer size. - defaultBufferRatio = uint64(10) -) - -var ( - // ErrKeyNotFoundInCache - key not found in cache. - ErrKeyNotFoundInCache = errors.New("Key not found in cache") - - // ErrCacheFull - cache is full. - ErrCacheFull = errors.New("Not enough space in cache") - - // ErrExcessData - excess data was attempted to be written on cache. - ErrExcessData = errors.New("Attempted excess write on cache") -) - -// buffer represents the in memory cache of a single entry. -// buffer carries value of the data and last accessed time. -type buffer struct { - buf *bytebufferpool.ByteBuffer - lastAccessed time.Time // Represents time when value was last accessed. -} - -// Cache holds the required variables to compose an in memory cache system -// which also provides expiring key mechanism and also maxSize. -type Cache struct { - // Mutex is used for handling the concurrent - // read/write requests for cache - mutex sync.Mutex - - // Once is used for resetting GC once after - // peak cache usage. - onceGC sync.Once - - // maxSize is a total size for overall cache - maxSize uint64 - - // maxCacheEntrySize is a total size per key buffer. - maxCacheEntrySize uint64 - - // currentSize is a current size in memory - currentSize uint64 - - // OnEviction - callback function for eviction - OnEviction func(key string) - - // totalEvicted counter to keep track of total expirys - totalEvicted int - - // map of cached keys and its values - entries map[string]*buffer - - // Expiry in time duration. - expiry time.Duration - - // Stop garbage collection routine, stops any running GC routine. - stopGC chan struct{} -} - -// New - Return a new cache with a given default expiry -// duration. If the expiry duration is less than one -// (or NoExpiry), the items in the cache never expire -// (by default), and must be deleted manually. -func New(maxSize uint64, expiry time.Duration) (c *Cache, err error) { - if maxSize == 0 { - err = errors.New("invalid maximum cache size") - return c, err - } - - // Max cache entry size - indicates the - // maximum buffer per key that can be held in - // memory. Currently this value is 1/10th - // the size of requested cache size. - maxCacheEntrySize := func() uint64 { - i := maxSize / defaultBufferRatio - if i == 0 { - i = maxSize - } - return i - }() - - c = &Cache{ - onceGC: sync.Once{}, - maxSize: maxSize, - maxCacheEntrySize: maxCacheEntrySize, - entries: make(map[string]*buffer), - expiry: expiry, - } - - // We have expiry start the janitor routine. - if expiry > 0 { - // Initialize a new stop GC channel. - c.stopGC = make(chan struct{}) - - // Start garbage collection routine to expire objects. - c.StartGC() - } - - return c, nil -} - -// Create - validates if object size fits with in cache size limit and returns a io.WriteCloser -// to which object contents can be written and finally Close()'d. During Close() we -// checks if the amount of data written is equal to the size of the object, in which -// case it saves the contents to object cache. -func (c *Cache) Create(key string) (wc io.WriteCloser) { - buf := bytebufferpool.Get() - - // Function called on close which saves the object contents - // to the object cache. - onClose := func() error { - c.mutex.Lock() - defer c.mutex.Unlock() - - if buf.Len() == 0 { - buf.Reset() - bytebufferpool.Put(buf) - - // If nothing is written in the buffer - // the key is not stored. - return nil - } - - if uint64(buf.Len()) > c.maxCacheEntrySize { - buf.Reset() - bytebufferpool.Put(buf) - - return ErrCacheFull - } - - // Full object available in buf, save it to cache. - c.entries[key] = &buffer{ - buf: buf, - lastAccessed: time.Now().UTC(), // Save last accessed time. - } - - // Account for the memory allocated above. - c.currentSize += uint64(buf.Len()) - return nil - } - - return &writeCloser{ByteBuffer: buf, onClose: onClose} -} - -// Open - open the in-memory file, returns an in memory reader. -// returns an error ErrKeyNotFoundInCache, if the key does not -// exist. ErrKeyNotFoundInCache is also returned if lastAccessed -// is older than input atime. -func (c *Cache) Open(key string, atime time.Time) (io.Reader, error) { - // Entry exists, return the readable buffer. - c.mutex.Lock() - defer c.mutex.Unlock() - b, ok := c.entries[key] - if !ok { - return nil, ErrKeyNotFoundInCache - } - - // Check if buf was recently accessed. - if b.lastAccessed.Before(atime) { - c.delete(key) - return nil, ErrKeyNotFoundInCache - } - - b.lastAccessed = time.Now() - return bytes.NewReader(b.buf.Bytes()), nil -} - -// Delete - delete deletes an entry from the cache. -func (c *Cache) Delete(key string) { - c.mutex.Lock() - c.delete(key) - c.mutex.Unlock() - if c.OnEviction != nil { - c.OnEviction(key) - } -} - -// gc - garbage collect all the expired entries from the cache. -func (c *Cache) gc() { - var evictedEntries []string - c.mutex.Lock() - for k, v := range c.entries { - if c.expiry > 0 && time.Now().UTC().Sub(v.lastAccessed) > c.expiry { - c.delete(k) - evictedEntries = append(evictedEntries, k) - } - } - c.mutex.Unlock() - for _, k := range evictedEntries { - if c.OnEviction != nil { - c.OnEviction(k) - } - } -} - -// StopGC sends a message to the expiry routine to stop -// expiring cached entries. NOTE: once this is called, cached -// entries will not be expired, be careful if you are using this. -func (c *Cache) StopGC() { - if c.stopGC != nil { - c.stopGC <- struct{}{} - } -} - -// StartGC starts running a routine ticking at expiry interval, -// on each interval this routine does a sweep across the cache -// entries and garbage collects all the expired entries. -func (c *Cache) StartGC() { - go func() { - for { - select { - // Wait till cleanup interval and initiate delete expired entries. - case <-time.After(c.expiry / 4): - c.gc() - // Stop the routine, usually called by the user of object cache during cleanup. - case <-c.stopGC: - return - } - } - }() -} - -// Deletes a requested entry from the cache. -func (c *Cache) delete(key string) { - if _, ok := c.entries[key]; ok { - deletedSize := uint64(c.entries[key].buf.Len()) - c.entries[key].buf.Reset() - bytebufferpool.Put(c.entries[key].buf) - delete(c.entries, key) - c.currentSize -= deletedSize - c.totalEvicted++ - } -} diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go deleted file mode 100644 index b04b92ea2..000000000 --- a/pkg/objcache/objcache_test.go +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2021 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package objcache - -import ( - "bytes" - "io" - "testing" - "time" -) - -// TestObjectCache tests cases of object cache with expiry. -func TestObjExpiry(t *testing.T) { - // Non exhaustive list of all object cache behavior cases. - testCases := []struct { - expiry time.Duration - cacheSize uint64 - err error - closeErr error - }{ - { - expiry: 100 * time.Millisecond, - cacheSize: 1024, - err: ErrKeyNotFoundInCache, - closeErr: nil, - }, - } - - // Test case 1 validates running of GC. - testCase := testCases[0] - cache, err := New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - cache.OnEviction = func(key string) {} - w := cache.Create("test") - // Write a byte. - w.Write([]byte("1")) - if err = w.Close(); err != nil { - t.Errorf("Test case 1 expected to pass, failed instead %s", err) - } - // Wait for 500 millisecond. - time.Sleep(500 * time.Millisecond) - // Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry. - fakeObjModTime := time.Time{} - _, err = cache.Open("test", fakeObjModTime) - if err != testCase.err { - t.Errorf("Test case 1 expected %s, got instead %s", testCase.err, err) - } -} - -// TestObjCache - tests various cases for object cache behavior. -func TestObjCache(t *testing.T) { - // Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry. - fakeObjModTime := time.Time{} - - // Non exhaustive list of all object cache behavior cases. - testCases := []struct { - expiry time.Duration - cacheSize uint64 - err error - closeErr error - }{ - // Validate if a key is not found in cache and Open fails. - { - expiry: NoExpiry, - cacheSize: 1024, - err: ErrKeyNotFoundInCache, - }, - // Validate if cache indicates that it is full and Create fails. - { - expiry: NoExpiry, - cacheSize: 1, - err: ErrCacheFull, - }, - // Validate if Create succeeds but Close fails to write to buffer. - { - expiry: NoExpiry, - cacheSize: 2, - closeErr: io.ErrShortBuffer, - }, - // Validate that Create and Close succeed, making sure to update the cache. - { - expiry: NoExpiry, - cacheSize: 1024, - }, - // Validate that Delete succeeds and Open fails with key not found in cache. - { - expiry: NoExpiry, - cacheSize: 1024, - err: ErrKeyNotFoundInCache, - }, - // Validate OnEviction function is called upon entry delete. - { - expiry: NoExpiry, - cacheSize: 1024, - }, - // Validate error excess data. - { - expiry: NoExpiry, - cacheSize: 5, - closeErr: ErrExcessData, - }, - // Validate error excess data during write. - { - expiry: NoExpiry, - cacheSize: 2048, - err: ErrExcessData, - }, - } - - // Test 1 validating Open failure. - testCase := testCases[0] - cache, err := New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - _, err = cache.Open("test", fakeObjModTime) - if testCase.err != err { - t.Errorf("Test case 2 expected to pass, failed instead %s", err) - } - - // Test 2 validating Create failure. - testCase = testCases[1] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w := cache.Create("test") - if w == nil { - t.Errorf("Test case 2 expected to pass, but returned nil") - } - w.Close() - - // Test 3 validating Create succeeds and returns a writer. - // Subsequently we Close() without writing any data, to receive - // `io.ErrShortBuffer` - testCase = testCases[2] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w = cache.Create("test") - // nothing is stored in the key, upon Close() the buf is discarded. - if err = w.Close(); err != nil { - t.Errorf("Test case 3 expected to pass, failed instead %s", err) - } - - // Test 4 validates Create and Close succeeds successfully caching - // the writes. - testCase = testCases[3] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w = cache.Create("test") - // Write '5' bytes. - w.Write([]byte("Hello")) - // Close to successfully save into cache. - if err = w.Close(); err != nil { - t.Errorf("Test case 4 expected to pass, failed instead %s", err) - } - r, err := cache.Open("test", fakeObjModTime) - if err != nil { - t.Errorf("Test case 4 expected to pass, failed instead %s", err) - } - // Reads everything stored for key "test". - cbytes := make([]byte, 5) - rat := r.(io.ReaderAt) - _, err = rat.ReadAt(cbytes, 0) - if err != nil { - t.Errorf("Test case 4 expected to pass, failed instead %s", err) - } - // Validate if read bytes match. - if !bytes.Equal(cbytes, []byte("Hello")) { - t.Errorf("Test case 4 expected to pass. wanted \"Hello\", got %s", string(cbytes)) - } - - // Test 5 validates Delete succeeds and Open fails with err - testCase = testCases[4] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w = cache.Create("test") - // Write '5' bytes. - w.Write([]byte("Hello")) - // Close to successfully save into cache. - if err = w.Close(); err != nil { - t.Errorf("Test case 5 expected to pass, failed instead %s", err) - } - // Delete the cache entry. - cache.Delete("test") - _, err = cache.Open("test", fakeObjModTime) - if testCase.err != err { - t.Errorf("Test case 5 expected to pass, failed instead %s", err) - } - - // Test 6 validates OnEviction being called upon Delete is being invoked. - testCase = testCases[5] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w = cache.Create("test") - // Write '5' bytes. - w.Write([]byte("Hello")) - // Close to successfully save into cache. - if err = w.Close(); err != nil { - t.Errorf("Test case 6 expected to pass, failed instead %s", err) - } - var deleteKey string - cache.OnEviction = func(key string) { - deleteKey = key - } - // Delete the cache entry. - cache.Delete("test") - if deleteKey != "test" { - t.Errorf("Test case 6 expected to pass, wanted \"test\", got %s", deleteKey) - } - - // Test 7 validates rejecting requests when excess data is being saved. - testCase = testCases[6] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w = cache.Create("test1") - // Write '5' bytes. - w.Write([]byte("Hello")) - // Close to successfully save into cache. - if err = w.Close(); err != nil { - t.Errorf("Test case 7 expected to pass, failed instead %s", err) - } - - w = cache.Create("test2") - // nothing got written, Close() will return success. - if err = w.Close(); err != nil { - t.Errorf("Test case 7 expected to pass, failed instead %s", err) - } - - // Test 8 validates rejecting Writes which write excess data. - testCase = testCases[7] - cache, err = New(testCase.cacheSize, testCase.expiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w = cache.Create("test1") - defer w.Close() - - // Write '5' bytes. - n, err := w.Write([]byte("Hello")) - if err != nil { - t.Errorf("Test case 8 expected to pass, failed instead %s", err) - } - if n != 5 { - t.Errorf("Test case 8 expected 5 bytes written, instead found %d", n) - } - // Write '1' more byte, should return error. - n, err = w.Write([]byte("W")) - if n == 0 && err != testCase.err { - t.Errorf("Test case 8 expected to fail with ErrExcessData, but failed with %s instead", err) - } -} - -// TestStateEntryPurge - tests if objCache purges stale entry and returns ErrKeyNotFoundInCache. -func TestStaleEntryPurge(t *testing.T) { - cache, err := New(1024, NoExpiry) - if err != nil { - t.Fatalf("Unable to create new objcache") - } - - w := cache.Create("test") - // Write '5' bytes. - w.Write([]byte("Hello")) - // Close to successfully save into cache. - if err = w.Close(); err != nil { - t.Errorf("Test case expected to pass, failed instead %s", err) - } - - _, err = cache.Open("test", time.Now().AddDate(0, 0, 1).UTC()) - if err != ErrKeyNotFoundInCache { - t.Errorf("Test case expected to return ErrKeyNotFoundInCache, instead returned %s", err) - } -} diff --git a/pkg/objcache/write_closer.go b/pkg/objcache/write_closer.go deleted file mode 100644 index cd734f23e..000000000 --- a/pkg/objcache/write_closer.go +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2021 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package objcache implements in memory caching methods. -package objcache - -import "github.com/valyala/bytebufferpool" - -// Is an Closer wrapper for bytebufferpool, upon close -// calls the defined onClose function. -type writeCloser struct { - *bytebufferpool.ByteBuffer - onClose func() error -} - -// On close, onClose() is called which checks if all object contents -// have been written so that it can save the buffer to the cache. -func (c writeCloser) Close() (err error) { - return c.onClose() -}