From b23659927cb40a864556da5082ecb0c811ad714e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 24 Feb 2021 15:51:41 -0800 Subject: [PATCH] fix: remove persistence layer for metacache store in memory (#11538) store the cache in-memory instead of disks to avoid large write amplifications for list heavy workloads, store in memory instead and let it auto expire. --- cmd/erasure-server-pool.go | 26 --- cmd/erasure-sets.go | 8 + cmd/erasure.go | 4 + cmd/metacache-bucket.go | 162 +--------------- cmd/metacache-bucket_test.go | 2 +- cmd/metacache-manager.go | 94 +--------- cmd/metacache-server-pool.go | 7 +- cmd/metacache-set.go | 337 +++++----------------------------- cmd/metacache.go | 23 --- go.mod | 3 +- go.sum | 9 +- pkg/objcache/objcache.go | 270 +++++++++++++++++++++++++++ pkg/objcache/objcache_test.go | 309 +++++++++++++++++++++++++++++++ pkg/objcache/write_closer.go | 34 ++++ 14 files changed, 683 insertions(+), 605 deletions(-) create mode 100644 pkg/objcache/objcache.go create mode 100644 pkg/objcache/objcache_test.go create mode 100644 pkg/objcache/write_closer.go diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 627702154..740f2c4db 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -857,7 +857,6 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre ri := logger.GetReqInfo(ctx) if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") { opts.discardResult = true - opts.Transient = true } merged, err := z.listPath(ctx, opts) @@ -1215,31 +1214,6 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo return nil } -// deleteAll will delete a bucket+prefix unconditionally across all disks. -// Note that set distribution is ignored so it should only be used in cases where -// data is not distributed across sets. -// Errors are logged but individual disk failures are not returned. -func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) { - for _, servers := range z.serverPools { - for _, set := range servers.sets { - set.deleteAll(ctx, bucket, prefix) - } - } -} - -// renameAll will rename bucket+prefix unconditionally across all disks to -// minioMetaTmpBucket + unique uuid, -// Note that set distribution is ignored so it should only be used in cases where -// data is not distributed across sets. Errors are logged but individual -// disk failures are not returned. -func (z *erasureServerPools) renameAll(ctx context.Context, bucket, prefix string) { - for _, servers := range z.serverPools { - for _, set := range servers.sets { - set.renameAll(ctx, bucket, prefix) - } - } -} - // This function is used to undo a successful DeleteBucket operation. func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools []*erasureSets, errs []error) { g := errgroup.WithNErrs(len(serverPools)) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index ef81e0ab6..0a807baef 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -29,6 +29,7 @@ import ( "time" "github.com/dchest/siphash" + "github.com/dustin/go-humanize" "github.com/google/uuid" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" @@ -37,6 +38,7 @@ import ( "github.com/minio/minio/pkg/console" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/objcache" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -366,6 +368,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto // setCount * setDriveCount with each memory upto blockSizeV1. bp := bpool.NewBytePoolCap(n, blockSizeV1, blockSizeV1*2) + mcache, err := objcache.New(1*humanize.GiByte, objcache.DefaultExpiry) + if err != nil { + return nil, err + } + for i := 0; i < setCount; i++ { s.erasureDisks[i] = make([]StorageAPI, setDriveCount) } @@ -412,6 +419,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto getEndpoints: s.GetEndpoints(i), nsMutex: mutex, bp: bp, + metaCache: mcache, mrfOpCh: make(chan partialOperation, 10000), } } diff --git a/cmd/erasure.go b/cmd/erasure.go index 24784316d..63e2dec5f 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -30,6 +30,7 @@ import ( "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/objcache" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -70,6 +71,9 @@ type erasureObjects struct { // Byte pools used for temporary i/o buffers. bp *bpool.BytePoolCap + // holds current list cache. + metaCache *objcache.Cache + mrfOpCh chan partialOperation } diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index 44d00ae2a..331625d69 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -17,11 +17,6 @@ package cmd import ( - "bytes" - "context" - "errors" - "fmt" - "net/http" "path" "runtime/debug" "sort" @@ -29,11 +24,8 @@ import ( "sync" "time" - "github.com/klauspost/compress/s2" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/console" - "github.com/minio/minio/pkg/hash" - "github.com/tinylib/msgp/msgp" ) //go:generate msgp -file $GOFILE -unexported @@ -57,16 +49,7 @@ type bucketMetacache struct { // newBucketMetacache creates a new bucketMetacache. // Optionally remove all existing caches. -func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache { - if cleanup { - // Recursively delete all caches. - objAPI := newObjectLayerFn() - ez, ok := objAPI.(*erasureServerPools) - if ok { - ctx := context.Background() - ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator)) - } - } +func newBucketMetacache(bucket string) *bucketMetacache { return &bucketMetacache{ bucket: bucket, caches: make(map[string]metacache, 10), @@ -80,111 +63,6 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) { } } -// loadBucketMetaCache will load the cache from the object layer. -// If the cache cannot be found a new one is created. -func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) { - objAPI := newObjectLayerFn() - for objAPI == nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - time.Sleep(250 * time.Millisecond) - } - objAPI = newObjectLayerFn() - if objAPI == nil { - logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket)) - } - } - - var meta bucketMetacache - var decErr error - // Use global context for this. - r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{}) - if err == nil { - dec := s2DecPool.Get().(*s2.Reader) - dec.Reset(r) - decErr = meta.DecodeMsg(msgp.NewReader(dec)) - dec.Reset(nil) - r.Close() - s2DecPool.Put(dec) - } - if err != nil { - switch err.(type) { - case ObjectNotFound: - err = nil - case InsufficientReadQuorum: - // Cache is likely lost. Clean up and return new. - return newBucketMetacache(bucket, true), nil - default: - logger.LogIf(ctx, err) - } - return newBucketMetacache(bucket, false), err - } - if decErr != nil { - if errors.Is(err, context.Canceled) { - return newBucketMetacache(bucket, false), err - } - // Log the error, but assume the data is lost and return a fresh bucket. - // Otherwise a broken cache will never recover. - logger.LogIf(ctx, decErr) - return newBucketMetacache(bucket, true), nil - } - // Sanity check... - if meta.bucket != bucket { - logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket) - return newBucketMetacache(bucket, true), nil - } - meta.cachesRoot = make(map[string][]string, len(meta.caches)/10) - // Index roots - for id, cache := range meta.caches { - meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id) - } - return &meta, nil -} - -// save the bucket cache to the object storage. -func (b *bucketMetacache) save(ctx context.Context) error { - if b.transient { - return nil - } - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - - // Keep lock while we marshal. - // We need a write lock since we update 'updated' - b.mu.Lock() - if !b.updated { - b.mu.Unlock() - return nil - } - // Save as s2 compressed msgpack - tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize())) - enc := s2.NewWriter(tmp) - err := msgp.Encode(enc, b) - if err != nil { - b.mu.Unlock() - return err - } - err = enc.Close() - if err != nil { - b.mu.Unlock() - return err - } - b.updated = false - b.mu.Unlock() - - hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len())) - if err != nil { - return err - } - _, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{}) - logger.LogIf(ctx, err) - return err -} - // findCache will attempt to find a matching cache for the provided options. // If a cache with the same ID exists already it will be returned. // If none can be found a new is created with the provided ID. @@ -448,41 +326,6 @@ func (b *bucketMetacache) getCache(id string) *metacache { return &c } -// deleteAll will delete all on disk data for ALL caches. -// Deletes are performed concurrently. -func (b *bucketMetacache) deleteAll() { - ctx := context.Background() - ez, ok := newObjectLayerFn().(*erasureServerPools) - if !ok { - logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasurePools")) - return - } - - b.mu.Lock() - defer b.mu.Unlock() - - b.updated = true - if !b.transient { - // Delete all. - ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator)) - b.caches = make(map[string]metacache, 10) - b.cachesRoot = make(map[string][]string, 10) - return - } - - // Transient are in different buckets. - var wg sync.WaitGroup - for id := range b.caches { - wg.Add(1) - go func(cache metacache) { - defer wg.Done() - ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id)) - }(b.caches[id]) - } - wg.Wait() - b.caches = make(map[string]metacache, 10) -} - // deleteCache will delete a specific cache and all files related to it across the cluster. func (b *bucketMetacache) deleteCache(id string) { b.mu.Lock() @@ -501,7 +344,4 @@ func (b *bucketMetacache) deleteCache(id string) { b.updated = true } b.mu.Unlock() - if ok { - c.delete(context.Background()) - } } diff --git a/cmd/metacache-bucket_test.go b/cmd/metacache-bucket_test.go index 1f157a4f2..a58057d09 100644 --- a/cmd/metacache-bucket_test.go +++ b/cmd/metacache-bucket_test.go @@ -6,7 +6,7 @@ import ( ) func Benchmark_bucketMetacache_findCache(b *testing.B) { - bm := newBucketMetacache("", false) + bm := newBucketMetacache("") const elements = 50000 const paths = 100 if elements%paths != 0 { diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index 480b06570..82276a12a 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "fmt" "runtime/debug" "sync" "time" @@ -42,25 +41,13 @@ type metacacheManager struct { trash map[string]metacache // Recently deleted lists. } -const metacacheManagerTransientBucket = "**transient**" const metacacheMaxEntries = 5000 // initManager will start async saving the cache. func (m *metacacheManager) initManager() { - // Add a transient bucket. - tb := newBucketMetacache(metacacheManagerTransientBucket, false) - tb.transient = true - m.buckets[metacacheManagerTransientBucket] = tb - // Start saver when object layer is ready. go func() { - objAPI := newObjectLayerFn() - for objAPI == nil { - time.Sleep(time.Second) - objAPI = newObjectLayerFn() - } if !globalIsErasure { - logger.Info("metacacheManager was initialized in non-erasure mode, skipping save") return } @@ -68,7 +55,6 @@ func (m *metacacheManager) initManager() { defer t.Stop() var exit bool - bg := context.Background() for !exit { select { case <-t.C: @@ -80,13 +66,11 @@ func (m *metacacheManager) initManager() { if !exit { v.cleanup() } - logger.LogIf(bg, v.save(bg)) } m.mu.RUnlock() m.mu.Lock() for k, v := range m.trash { if time.Since(v.lastUpdate) > metacacheMaxRunningAge { - v.delete(context.Background()) delete(m.trash, k) } } @@ -97,9 +81,6 @@ func (m *metacacheManager) initManager() { // findCache will get a metacache. func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache { - if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) { - return m.getTransient().findCache(o) - } m.mu.RLock() b, ok := m.buckets[o.Bucket] if ok { @@ -136,10 +117,6 @@ func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error) func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache { m.init.Do(m.initManager) - // Return a transient bucket for invalid or system buckets. - if isReservedOrInvalidBucket(bucket, false) { - return m.getTransient() - } m.mu.RLock() b, ok := m.buckets[bucket] m.mu.RUnlock() @@ -163,16 +140,7 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket return b } - // Load bucket. If we fail return the transient bucket. - b, err := loadBucketMetaCache(ctx, bucket) - if err != nil { - m.mu.Unlock() - logger.LogIf(ctx, err) - return m.getTransient() - } - if b.bucket != bucket { - logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket)) - } + b = newBucketMetacache(bucket) m.buckets[bucket] = b m.mu.Unlock() return b @@ -195,10 +163,6 @@ func (m *metacacheManager) deleteBucketCache(bucket string) { b.mu.Lock() defer b.mu.Unlock() for k, v := range b.caches { - if time.Since(v.lastUpdate) > metacacheMaxRunningAge { - v.delete(context.Background()) - continue - } v.error = "Bucket deleted" v.status = scanStateError m.mu.Lock() @@ -212,59 +176,7 @@ func (m *metacacheManager) deleteAll() { m.init.Do(m.initManager) m.mu.Lock() defer m.mu.Unlock() - for bucket, b := range m.buckets { - b.deleteAll() - if !b.transient { - delete(m.buckets, bucket) - } + for bucket := range m.buckets { + delete(m.buckets, bucket) } } - -// getTransient will return a transient bucket. -func (m *metacacheManager) getTransient() *bucketMetacache { - m.init.Do(m.initManager) - m.mu.RLock() - bmc := m.buckets[metacacheManagerTransientBucket] - m.mu.RUnlock() - return bmc -} - -// checkMetacacheState should be used if data is not updating. -// Should only be called if a failure occurred. -func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error { - // We operate on a copy... - o.Create = false - var cache metacache - if rpc == nil || o.Transient { - cache = localMetacacheMgr.findCache(ctx, o) - } else { - c, err := rpc.GetMetacacheListing(ctx, o) - if err != nil { - return err - } - cache = *c - } - - if cache.status == scanStateNone || cache.fileNotFound { - return errFileNotFound - } - if cache.status == scanStateSuccess || cache.status == scanStateStarted { - if time.Since(cache.lastUpdate) > metacacheMaxRunningAge { - // We got a stale entry, mark error on handling server. - err := fmt.Errorf("timeout: list %s not updated", cache.id) - cache.error = err.Error() - cache.status = scanStateError - if rpc == nil || o.Transient { - localMetacacheMgr.updateCacheEntry(cache) - } else { - rpc.UpdateMetacacheListing(ctx, cache) - } - return err - } - return nil - } - if cache.error != "" { - return fmt.Errorf("async cache listing failed with: %s", cache.error) - } - return nil -} diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 6dea435c7..553f4c95c 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -116,7 +116,6 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e // will be generated due to the marker without ID and this check failing. if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive { o.discardResult = true - o.Transient = true } var cache metacache @@ -127,12 +126,13 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e var cache metacache rpc := globalNotificationSys.restClientFromHash(o.Bucket) if isReservedOrInvalidBucket(o.Bucket, false) { + // discard all list caches for reserved buckets. + o.discardResult = true rpc = nil - o.Transient = true } // Apply prefix filter if enabled. o.SetFilter() - if rpc == nil || o.Transient { + if rpc == nil { // Local cache = localMetacacheMgr.findCache(ctx, o) } else { @@ -148,7 +148,6 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e if !errors.Is(err, context.DeadlineExceeded) { logger.LogIf(ctx, err) } - o.Transient = true cache = localMetacacheMgr.findCache(ctx, o) } else { cache = *c diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 4f42e42cd..207d699d1 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -17,23 +17,18 @@ package cmd import ( - "bytes" "context" "encoding/gob" - "encoding/json" "errors" "fmt" "io" - "strconv" "strings" "sync" "time" - jsoniter "github.com/json-iterator/go" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/console" - "github.com/minio/minio/pkg/hash" ) type listPathOptions struct { @@ -91,11 +86,6 @@ type listPathOptions struct { // Include pure directories. IncludeDirectories bool - // Transient is set if the cache is transient due to an error or being a reserved bucket. - // This means the cache metadata will not be persisted on disk. - // A transient result will never be returned from the cache so knowing the list id is required. - Transient bool - // discardResult will not persist the cache to storage. // When the initial results are returned listing will be canceled. discardResult bool @@ -196,74 +186,14 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa } } -// findFirstPart will find the part with 0 being the first that corresponds to the marker in the options. -// io.ErrUnexpectedEOF is returned if the place containing the marker hasn't been scanned yet. -// io.EOF indicates the marker is beyond the end of the stream and does not exist. -func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) { - search := o.Marker - if search == "" { - search = o.Prefix - } - if search == "" { - return 0, nil - } - o.debugln("searching for ", search) - var tmp metacacheBlock - var json = jsoniter.ConfigCompatibleWithStandardLibrary - i := 0 - for { - partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, i) - v, ok := fi.Metadata[partKey] - if !ok { - o.debugln("no match in metadata, waiting") - return -1, io.ErrUnexpectedEOF - } - err := json.Unmarshal([]byte(v), &tmp) - if !ok { - logger.LogIf(context.Background(), err) - return -1, err - } - if tmp.First == "" && tmp.Last == "" && tmp.EOS { - return 0, errFileNotFound - } - if tmp.First >= search { - o.debugln("First >= search", v) - return i, nil - } - if tmp.Last >= search { - o.debugln("Last >= search", v) - return i, nil - } - if tmp.EOS { - o.debugln("no match, at EOS", v) - return -3, io.EOF - } - o.debugln("First ", tmp.First, "<", search, " search", i) - i++ - } -} - // updateMetacacheListing will update the metacache listing. func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) { - if o.Transient { - return localMetacacheMgr.getTransient().updateCacheEntry(m) - } if rpc == nil { return localMetacacheMgr.updateCacheEntry(m) } return rpc.UpdateMetacacheListing(context.Background(), m) } -func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) { - var tmp metacacheBlock - partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, block) - v, ok := fi.Metadata[partKey] - if !ok { - return nil, io.ErrUnexpectedEOF - } - return &tmp, json.Unmarshal([]byte(v), &tmp) -} - const metacachePrefix = ".metacache" func metacachePrefixForID(bucket, id string) string { @@ -271,8 +201,8 @@ func metacachePrefixForID(bucket, id string) string { } // objectPath returns the object path of the cache. -func (o *listPathOptions) objectPath(block int) string { - return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2") +func (o *listPathOptions) objectPath() string { + return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block.s2") } func (o *listPathOptions) SetFilter() { @@ -357,187 +287,33 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor } func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { - retries := 0 - rpc := globalNotificationSys.restClientFromHash(o.Bucket) - - for { - select { - case <-ctx.Done(): - return entries, ctx.Err() - default: - } - - // If many failures, check the cache state. - if retries > 10 { - err := o.checkMetacacheState(ctx, rpc) - if err != nil { - return entries, fmt.Errorf("remote listing canceled: %w", err) - } - retries = 1 - } - - const retryDelay = 500 * time.Millisecond - // Load first part metadata... - // All operations are performed without locks, so we must be careful and allow for failures. - // Read metadata associated with the object from a disk. - if retries > 0 { - disks := er.getOnlineDisks() - if len(disks) == 0 { - time.Sleep(retryDelay) - retries++ - continue - } - - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) - if err != nil { - time.Sleep(retryDelay) - retries++ - continue - } - } - - // Read metadata associated with the object from all disks. - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true) - if err != nil { - switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) { - case ObjectNotFound: - retries++ - time.Sleep(retryDelay) - continue - case InsufficientReadQuorum: - retries++ - time.Sleep(retryDelay) - continue - default: - return entries, fmt.Errorf("reading first part metadata: %w", err) - } - } - - partN, err := o.findFirstPart(fi) - switch { - case err == nil: - case errors.Is(err, io.ErrUnexpectedEOF): - if retries == 10 { - err := o.checkMetacacheState(ctx, rpc) - if err != nil { - return entries, fmt.Errorf("remote listing canceled: %w", err) - } - retries = -1 - } - retries++ - time.Sleep(retryDelay) - continue - case errors.Is(err, io.EOF): - return entries, io.EOF - } - - // We got a stream to start at. - loadedPart := 0 - buf := bufferPool.Get().(*bytes.Buffer) - defer func() { - buf.Reset() - bufferPool.Put(buf) - }() - for { - select { - case <-ctx.Done(): - return entries, ctx.Err() - default: - } - - if partN != loadedPart { - if retries > 10 { - err := o.checkMetacacheState(ctx, rpc) - if err != nil { - return entries, fmt.Errorf("waiting for next part %d: %w", partN, err) - } - retries = 1 - } - - if retries > 0 { - // Load from one disk only - disks := er.getOnlineDisks() - if len(disks) == 0 { - time.Sleep(retryDelay) - retries++ - continue - } - - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) - if err != nil { - time.Sleep(retryDelay) - retries++ - continue - } - } - // Load first part metadata... - fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) - if err != nil { - time.Sleep(retryDelay) - retries++ - continue - } - loadedPart = partN - bi, err := getMetacacheBlockInfo(fi, partN) - logger.LogIf(ctx, err) - if err == nil { - if bi.pastPrefix(o.Prefix) { - return entries, io.EOF - } - } - } - buf.Reset() - err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks) - if err != nil { - switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) { - case ObjectNotFound: - retries++ - time.Sleep(retryDelay) - continue - case InsufficientReadQuorum: - retries++ - time.Sleep(retryDelay) - continue - default: - logger.LogIf(ctx, err) - return entries, err - } - } - tmp, err := newMetacacheReader(buf) - if err != nil { - return entries, err - } - e, err := tmp.filter(o) - entries.o = append(entries.o, e.o...) - if o.Limit > 0 && entries.len() > o.Limit { - entries.truncate(o.Limit) - return entries, nil - } - if err == nil { - // We stopped within the listing, we are done for now... - return entries, nil - } - if !errors.Is(err, io.EOF) { - logger.LogIf(ctx, err) - return entries, err - } - - // We finished at the end of the block. - // And should not expect any more results. - bi, err := getMetacacheBlockInfo(fi, partN) - logger.LogIf(ctx, err) - if err != nil || bi.EOS { - // We are done and there are no more parts. - return entries, io.EOF - } - if bi.endedPrefix(o.Prefix) { - // Nothing more for prefix. - return entries, io.EOF - } - partN++ - retries = 0 - } + r, err := er.metaCache.Open(pathJoin(minioMetaBucket, o.objectPath()), time.Now().Add(-time.Hour)) + if err != nil { + return entries, io.EOF } + + tmp, err := newMetacacheReader(r) + if err != nil { + return entries, err + } + + e, err := tmp.filter(o) + entries.o = append(entries.o, e.o...) + if o.Limit > 0 && entries.len() > o.Limit { + entries.truncate(o.Limit) + return entries, nil + } + + if err == nil { + // We stopped within the listing, we are done for now... + return entries, nil + } + + if !errors.Is(err, io.EOF) { + logger.LogIf(ctx, err) + } + + return entries, err } // Will return io.EOF if continuing would not yield more results. @@ -658,28 +434,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr } }() - const retryDelay = 200 * time.Millisecond - const maxTries = 5 + wc := er.metaCache.Create(pathJoin(minioMetaBucket, o.objectPath())) var bw *metacacheBlockWriter // Don't save single object listings. if !o.discardResult { // Write results to disk. bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { - // if the block is 0 bytes and its a first block skip it. - // skip only this for Transient caches. - if len(b.data) == 0 && b.n == 0 && o.Transient { - return nil - } - o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n)) - r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) - logger.LogIf(ctx, err) - custom := b.headerKV() - _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{ - UserDefined: custom, - NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. - ParentIsObject: nil, - }) + n, err := wc.Write(b.data) if err != nil { metaMu.Lock() if meta.error != "" { @@ -690,29 +452,17 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr cancel() return err } - if b.n == 0 { - return nil - } - // Update block 0 metadata. - var retries int - for { - err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{}) - if err == nil { - break - } - switch err.(type) { - case ObjectNotFound: - return err - case InsufficientReadQuorum: - default: - logger.LogIf(ctx, err) - } - if retries >= maxTries { - return err - } - retries++ - time.Sleep(retryDelay) + if n != len(b.data) { + metaMu.Lock() + if meta.error != "" { + meta.status = scanStateError + meta.error = io.ErrShortWrite.Error() + } + metaMu.Unlock() + cancel() + return io.ErrShortWrite } + o.debugln(color.Green("listPath:")+" saving block to", o.objectPath()) return nil }) } @@ -772,6 +522,13 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr meta, _ = o.updateMetacacheListing(meta, rpc) metaMu.Unlock() } + if err := wc.Close(); err != nil { + metaMu.Lock() + meta.error = err.Error() + meta.status = scanStateError + meta, _ = o.updateMetacacheListing(meta, rpc) + metaMu.Unlock() + } } }() diff --git a/cmd/metacache.go b/cmd/metacache.go index 9f0120b51..5d268c621 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -17,14 +17,9 @@ package cmd import ( - "context" - "errors" - "fmt" "path" "strings" "time" - - "github.com/minio/minio/cmd/logger" ) type scanStatus uint8 @@ -231,21 +226,3 @@ func (m *metacache) update(update metacache) { } m.fileNotFound = m.fileNotFound || update.fileNotFound } - -// delete all cache data on disks. -func (m *metacache) delete(ctx context.Context) { - if m.bucket == "" || m.id == "" { - logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id)) - } - objAPI := newObjectLayerFn() - if objAPI == nil { - logger.LogIf(ctx, errors.New("metacache.delete: no object layer")) - return - } - ez, ok := objAPI.(*erasureServerPools) - if !ok { - logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools")) - return - } - ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id)) -} diff --git a/go.mod b/go.mod index 26f744b79..d5108188b 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( github.com/prometheus/client_golang v1.8.0 github.com/prometheus/client_model v0.2.0 github.com/prometheus/procfs v0.2.0 - github.com/quasilyte/go-ruleguard v0.2.1 // indirect github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.7.0 github.com/secure-io/sio-go v0.3.1 @@ -81,7 +80,7 @@ require ( github.com/tidwall/gjson v1.6.7 github.com/tidwall/sjson v1.0.4 github.com/tinylib/msgp v1.1.3 - github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect + github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/willf/bitset v1.1.11 // indirect github.com/willf/bloom v2.0.3+incompatible diff --git a/go.sum b/go.sum index 765f0b184..4341a130f 100644 --- a/go.sum +++ b/go.sum @@ -527,8 +527,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo= -github.com/quasilyte/go-ruleguard v0.2.1/go.mod h1:hN2rVc/uS4bQhQKTio2XaSJSafJwqBUWWwtssT3cQmc= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -597,11 +595,11 @@ github.com/tinylib/msgp v1.1.3 h1:3giwAkmtaEDLSV0MdO1lDLuPgklgPzmk8H9+So2BVfA= github.com/tinylib/msgp v1.1.3/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4= -github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= @@ -614,7 +612,6 @@ github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= @@ -706,7 +703,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -778,7 +774,6 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go new file mode 100644 index 000000000..fb0e145c3 --- /dev/null +++ b/pkg/objcache/objcache.go @@ -0,0 +1,270 @@ +/* + * Minio Cloud Storage, (C) 2021 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package objcache implements in memory caching methods. +package objcache + +import ( + "bytes" + "errors" + "io" + "sync" + "time" + + "github.com/valyala/bytebufferpool" +) + +const ( + // NoExpiry represents caches to be permanent and can only be deleted. + NoExpiry = time.Duration(0) + + // DefaultExpiry represents 1 hour time duration when all entries shall be expired. + DefaultExpiry = time.Hour + + // defaultBufferRatio represents default ratio used to calculate the + // individual cache entry buffer size. + defaultBufferRatio = uint64(10) +) + +var ( + // ErrKeyNotFoundInCache - key not found in cache. + ErrKeyNotFoundInCache = errors.New("Key not found in cache") + + // ErrCacheFull - cache is full. + ErrCacheFull = errors.New("Not enough space in cache") + + // ErrExcessData - excess data was attempted to be written on cache. + ErrExcessData = errors.New("Attempted excess write on cache") +) + +// buffer represents the in memory cache of a single entry. +// buffer carries value of the data and last accessed time. +type buffer struct { + buf *bytebufferpool.ByteBuffer + lastAccessed time.Time // Represents time when value was last accessed. +} + +// Cache holds the required variables to compose an in memory cache system +// which also provides expiring key mechanism and also maxSize. +type Cache struct { + // Mutex is used for handling the concurrent + // read/write requests for cache + mutex sync.Mutex + + // Once is used for resetting GC once after + // peak cache usage. + onceGC sync.Once + + // maxSize is a total size for overall cache + maxSize uint64 + + // maxCacheEntrySize is a total size per key buffer. + maxCacheEntrySize uint64 + + // currentSize is a current size in memory + currentSize uint64 + + // OnEviction - callback function for eviction + OnEviction func(key string) + + // totalEvicted counter to keep track of total expirys + totalEvicted int + + // map of cached keys and its values + entries map[string]*buffer + + // Expiry in time duration. + expiry time.Duration + + // Stop garbage collection routine, stops any running GC routine. + stopGC chan struct{} +} + +// New - Return a new cache with a given default expiry +// duration. If the expiry duration is less than one +// (or NoExpiry), the items in the cache never expire +// (by default), and must be deleted manually. +func New(maxSize uint64, expiry time.Duration) (c *Cache, err error) { + if maxSize == 0 { + err = errors.New("invalid maximum cache size") + return c, err + } + + // Max cache entry size - indicates the + // maximum buffer per key that can be held in + // memory. Currently this value is 1/10th + // the size of requested cache size. + maxCacheEntrySize := func() uint64 { + i := maxSize / defaultBufferRatio + if i == 0 { + i = maxSize + } + return i + }() + + c = &Cache{ + onceGC: sync.Once{}, + maxSize: maxSize, + maxCacheEntrySize: maxCacheEntrySize, + entries: make(map[string]*buffer), + expiry: expiry, + } + + // We have expiry start the janitor routine. + if expiry > 0 { + // Initialize a new stop GC channel. + c.stopGC = make(chan struct{}) + + // Start garbage collection routine to expire objects. + c.StartGC() + } + + return c, nil +} + +// Create - validates if object size fits with in cache size limit and returns a io.WriteCloser +// to which object contents can be written and finally Close()'d. During Close() we +// checks if the amount of data written is equal to the size of the object, in which +// case it saves the contents to object cache. +func (c *Cache) Create(key string) (wc io.WriteCloser) { + buf := bytebufferpool.Get() + + // Function called on close which saves the object contents + // to the object cache. + onClose := func() error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if buf.Len() == 0 { + buf.Reset() + bytebufferpool.Put(buf) + + // If nothing is written in the buffer + // the key is not stored. + return nil + } + + if uint64(buf.Len()) > c.maxCacheEntrySize { + buf.Reset() + bytebufferpool.Put(buf) + + return ErrCacheFull + } + + // Full object available in buf, save it to cache. + c.entries[key] = &buffer{ + buf: buf, + lastAccessed: time.Now().UTC(), // Save last accessed time. + } + + // Account for the memory allocated above. + c.currentSize += uint64(buf.Len()) + return nil + } + + return &writeCloser{ByteBuffer: buf, onClose: onClose} +} + +// Open - open the in-memory file, returns an in memory reader. +// returns an error ErrKeyNotFoundInCache, if the key does not +// exist. ErrKeyNotFoundInCache is also returned if lastAccessed +// is older than input atime. +func (c *Cache) Open(key string, atime time.Time) (io.Reader, error) { + // Entry exists, return the readable buffer. + c.mutex.Lock() + defer c.mutex.Unlock() + b, ok := c.entries[key] + if !ok { + return nil, ErrKeyNotFoundInCache + } + + // Check if buf was recently accessed. + if b.lastAccessed.Before(atime) { + c.delete(key) + return nil, ErrKeyNotFoundInCache + } + + b.lastAccessed = time.Now() + return bytes.NewReader(b.buf.Bytes()), nil +} + +// Delete - delete deletes an entry from the cache. +func (c *Cache) Delete(key string) { + c.mutex.Lock() + c.delete(key) + c.mutex.Unlock() + if c.OnEviction != nil { + c.OnEviction(key) + } +} + +// gc - garbage collect all the expired entries from the cache. +func (c *Cache) gc() { + var evictedEntries []string + c.mutex.Lock() + for k, v := range c.entries { + if c.expiry > 0 && time.Now().UTC().Sub(v.lastAccessed) > c.expiry { + c.delete(k) + evictedEntries = append(evictedEntries, k) + } + } + c.mutex.Unlock() + for _, k := range evictedEntries { + if c.OnEviction != nil { + c.OnEviction(k) + } + } +} + +// StopGC sends a message to the expiry routine to stop +// expiring cached entries. NOTE: once this is called, cached +// entries will not be expired, be careful if you are using this. +func (c *Cache) StopGC() { + if c.stopGC != nil { + c.stopGC <- struct{}{} + } +} + +// StartGC starts running a routine ticking at expiry interval, +// on each interval this routine does a sweep across the cache +// entries and garbage collects all the expired entries. +func (c *Cache) StartGC() { + go func() { + for { + select { + // Wait till cleanup interval and initiate delete expired entries. + case <-time.After(c.expiry / 4): + c.gc() + // Stop the routine, usually called by the user of object cache during cleanup. + case <-c.stopGC: + return + } + } + }() +} + +// Deletes a requested entry from the cache. +func (c *Cache) delete(key string) { + if _, ok := c.entries[key]; ok { + deletedSize := uint64(c.entries[key].buf.Len()) + c.entries[key].buf.Reset() + bytebufferpool.Put(c.entries[key].buf) + delete(c.entries, key) + c.currentSize -= deletedSize + c.totalEvicted++ + } +} diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go new file mode 100644 index 000000000..b04b92ea2 --- /dev/null +++ b/pkg/objcache/objcache_test.go @@ -0,0 +1,309 @@ +/* + * Minio Cloud Storage, (C) 2021 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package objcache + +import ( + "bytes" + "io" + "testing" + "time" +) + +// TestObjectCache tests cases of object cache with expiry. +func TestObjExpiry(t *testing.T) { + // Non exhaustive list of all object cache behavior cases. + testCases := []struct { + expiry time.Duration + cacheSize uint64 + err error + closeErr error + }{ + { + expiry: 100 * time.Millisecond, + cacheSize: 1024, + err: ErrKeyNotFoundInCache, + closeErr: nil, + }, + } + + // Test case 1 validates running of GC. + testCase := testCases[0] + cache, err := New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + cache.OnEviction = func(key string) {} + w := cache.Create("test") + // Write a byte. + w.Write([]byte("1")) + if err = w.Close(); err != nil { + t.Errorf("Test case 1 expected to pass, failed instead %s", err) + } + // Wait for 500 millisecond. + time.Sleep(500 * time.Millisecond) + // Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry. + fakeObjModTime := time.Time{} + _, err = cache.Open("test", fakeObjModTime) + if err != testCase.err { + t.Errorf("Test case 1 expected %s, got instead %s", testCase.err, err) + } +} + +// TestObjCache - tests various cases for object cache behavior. +func TestObjCache(t *testing.T) { + // Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry. + fakeObjModTime := time.Time{} + + // Non exhaustive list of all object cache behavior cases. + testCases := []struct { + expiry time.Duration + cacheSize uint64 + err error + closeErr error + }{ + // Validate if a key is not found in cache and Open fails. + { + expiry: NoExpiry, + cacheSize: 1024, + err: ErrKeyNotFoundInCache, + }, + // Validate if cache indicates that it is full and Create fails. + { + expiry: NoExpiry, + cacheSize: 1, + err: ErrCacheFull, + }, + // Validate if Create succeeds but Close fails to write to buffer. + { + expiry: NoExpiry, + cacheSize: 2, + closeErr: io.ErrShortBuffer, + }, + // Validate that Create and Close succeed, making sure to update the cache. + { + expiry: NoExpiry, + cacheSize: 1024, + }, + // Validate that Delete succeeds and Open fails with key not found in cache. + { + expiry: NoExpiry, + cacheSize: 1024, + err: ErrKeyNotFoundInCache, + }, + // Validate OnEviction function is called upon entry delete. + { + expiry: NoExpiry, + cacheSize: 1024, + }, + // Validate error excess data. + { + expiry: NoExpiry, + cacheSize: 5, + closeErr: ErrExcessData, + }, + // Validate error excess data during write. + { + expiry: NoExpiry, + cacheSize: 2048, + err: ErrExcessData, + }, + } + + // Test 1 validating Open failure. + testCase := testCases[0] + cache, err := New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + _, err = cache.Open("test", fakeObjModTime) + if testCase.err != err { + t.Errorf("Test case 2 expected to pass, failed instead %s", err) + } + + // Test 2 validating Create failure. + testCase = testCases[1] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w := cache.Create("test") + if w == nil { + t.Errorf("Test case 2 expected to pass, but returned nil") + } + w.Close() + + // Test 3 validating Create succeeds and returns a writer. + // Subsequently we Close() without writing any data, to receive + // `io.ErrShortBuffer` + testCase = testCases[2] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w = cache.Create("test") + // nothing is stored in the key, upon Close() the buf is discarded. + if err = w.Close(); err != nil { + t.Errorf("Test case 3 expected to pass, failed instead %s", err) + } + + // Test 4 validates Create and Close succeeds successfully caching + // the writes. + testCase = testCases[3] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w = cache.Create("test") + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + r, err := cache.Open("test", fakeObjModTime) + if err != nil { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + // Reads everything stored for key "test". + cbytes := make([]byte, 5) + rat := r.(io.ReaderAt) + _, err = rat.ReadAt(cbytes, 0) + if err != nil { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + // Validate if read bytes match. + if !bytes.Equal(cbytes, []byte("Hello")) { + t.Errorf("Test case 4 expected to pass. wanted \"Hello\", got %s", string(cbytes)) + } + + // Test 5 validates Delete succeeds and Open fails with err + testCase = testCases[4] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w = cache.Create("test") + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 5 expected to pass, failed instead %s", err) + } + // Delete the cache entry. + cache.Delete("test") + _, err = cache.Open("test", fakeObjModTime) + if testCase.err != err { + t.Errorf("Test case 5 expected to pass, failed instead %s", err) + } + + // Test 6 validates OnEviction being called upon Delete is being invoked. + testCase = testCases[5] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w = cache.Create("test") + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 6 expected to pass, failed instead %s", err) + } + var deleteKey string + cache.OnEviction = func(key string) { + deleteKey = key + } + // Delete the cache entry. + cache.Delete("test") + if deleteKey != "test" { + t.Errorf("Test case 6 expected to pass, wanted \"test\", got %s", deleteKey) + } + + // Test 7 validates rejecting requests when excess data is being saved. + testCase = testCases[6] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w = cache.Create("test1") + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 7 expected to pass, failed instead %s", err) + } + + w = cache.Create("test2") + // nothing got written, Close() will return success. + if err = w.Close(); err != nil { + t.Errorf("Test case 7 expected to pass, failed instead %s", err) + } + + // Test 8 validates rejecting Writes which write excess data. + testCase = testCases[7] + cache, err = New(testCase.cacheSize, testCase.expiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w = cache.Create("test1") + defer w.Close() + + // Write '5' bytes. + n, err := w.Write([]byte("Hello")) + if err != nil { + t.Errorf("Test case 8 expected to pass, failed instead %s", err) + } + if n != 5 { + t.Errorf("Test case 8 expected 5 bytes written, instead found %d", n) + } + // Write '1' more byte, should return error. + n, err = w.Write([]byte("W")) + if n == 0 && err != testCase.err { + t.Errorf("Test case 8 expected to fail with ErrExcessData, but failed with %s instead", err) + } +} + +// TestStateEntryPurge - tests if objCache purges stale entry and returns ErrKeyNotFoundInCache. +func TestStaleEntryPurge(t *testing.T) { + cache, err := New(1024, NoExpiry) + if err != nil { + t.Fatalf("Unable to create new objcache") + } + + w := cache.Create("test") + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case expected to pass, failed instead %s", err) + } + + _, err = cache.Open("test", time.Now().AddDate(0, 0, 1).UTC()) + if err != ErrKeyNotFoundInCache { + t.Errorf("Test case expected to return ErrKeyNotFoundInCache, instead returned %s", err) + } +} diff --git a/pkg/objcache/write_closer.go b/pkg/objcache/write_closer.go new file mode 100644 index 000000000..cd734f23e --- /dev/null +++ b/pkg/objcache/write_closer.go @@ -0,0 +1,34 @@ +/* + * Minio Cloud Storage, (C) 2021 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package objcache implements in memory caching methods. +package objcache + +import "github.com/valyala/bytebufferpool" + +// Is an Closer wrapper for bytebufferpool, upon close +// calls the defined onClose function. +type writeCloser struct { + *bytebufferpool.ByteBuffer + onClose func() error +} + +// On close, onClose() is called which checks if all object contents +// have been written so that it can save the buffer to the cache. +func (c writeCloser) Close() (err error) { + return c.onClose() +}