diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 4958ce594..ef8593470 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -654,6 +654,50 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met return jsonSave(f, m) } +// updates the ETag and ModTime on cache with ETag from backend +func (c *diskCache) updateMetadata(ctx context.Context, bucket, object, etag string, modTime time.Time, size int64) error { + cachedPath := getCacheSHADir(c.dir, bucket, object) + metaPath := pathJoin(cachedPath, cacheMetaJSONFile) + // Create cache directory if needed + if err := os.MkdirAll(cachedPath, 0777); err != nil { + return err + } + f, err := os.OpenFile(metaPath, os.O_RDWR, 0666) + if err != nil { + return err + } + defer f.Close() + + m := &cacheMeta{ + Version: cacheMetaVersion, + Bucket: bucket, + Object: object, + } + if err := jsonLoad(f, m); err != nil && err != io.EOF { + return err + } + if m.Meta == nil { + m.Meta = make(map[string]string) + } + var key []byte + var objectEncryptionKey crypto.ObjectKey + + if globalCacheKMS != nil { + // Calculating object encryption key + key, err = decryptObjectInfo(key, bucket, object, m.Meta) + if err != nil { + return err + } + copy(objectEncryptionKey[:], key) + m.Meta["etag"] = hex.EncodeToString(objectEncryptionKey.SealETag([]byte(etag))) + } else { + m.Meta["etag"] = etag + } + m.Meta["last-modified"] = modTime.UTC().Format(http.TimeFormat) + m.Meta["Content-Length"] = strconv.Itoa(int(size)) + return jsonSave(f, m) +} + func getCacheSHADir(dir, bucket, object string) string { return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object)))) } @@ -755,22 +799,36 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string) metadata[SSECacheEncrypted] = "" return objectKey[:], nil } +func (c *diskCache) GetLockContext(ctx context.Context, bucket, object string) (RWLocker, LockContext, error) { + cachePath := getCacheSHADir(c.dir, bucket, object) + cLock := c.NewNSLockFn(cachePath) + lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) + return cLock, lkctx, err +} // Caches the object to disk -func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) (oi ObjectInfo, err error) { +func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) { if !c.diskSpaceAvailable(size) { io.Copy(ioutil.Discard, data) return oi, errDiskFull } - cachePath := getCacheSHADir(c.dir, bucket, object) - cLock := c.NewNSLockFn(cachePath) - lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) + cLock, lkctx, err := c.GetLockContext(ctx, bucket, object) if err != nil { return oi, err } ctx = lkctx.Context() defer cLock.Unlock(lkctx.Cancel) + return c.put(ctx, bucket, object, data, size, rs, opts, incHitsOnly, writeback) +} + +// Caches the object to disk +func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) { + if !c.diskSpaceAvailable(size) { + io.Copy(ioutil.Discard, data) + return oi, errDiskFull + } + cachePath := getCacheSHADir(c.dir, bucket, object) meta, _, numHits, err := c.statCache(ctx, cachePath) // Case where object not yet cached if osIsNotExist(err) && c.after >= 1 { @@ -819,7 +877,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read removeAll(cachePath) return oi, IncompleteBody{Bucket: bucket, Object: object} } - if c.commitWriteback { + if writeback { metadata["content-md5"] = md5sum if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil { metadata["etag"] = hex.EncodeToString(md5bytes) @@ -1073,7 +1131,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // the remaining parts. partOffset = 0 } // End of read all parts loop. - pr.CloseWithError(err) + pw.CloseWithError(err) }() } else { go func() { @@ -1105,8 +1163,15 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang return gr, numHits, nil } +// deletes the cached object - caller should have taken write lock +func (c *diskCache) delete(bucket, object string) (err error) { + cacheObjPath := getCacheSHADir(c.dir, bucket, object) + return removeAll(cacheObjPath) +} + // Deletes the cached object -func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { +func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) { + cacheObjPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cacheObjPath) lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) if err != nil { @@ -1116,12 +1181,6 @@ func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) return removeAll(cacheObjPath) } -// Deletes the cached object -func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) { - cacheObjPath := getCacheSHADir(c.dir, bucket, object) - return c.delete(ctx, cacheObjPath) -} - // convenience function to check if object is cached on this diskCache func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool { if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil { @@ -1199,14 +1258,11 @@ func (c *diskCache) NewMultipartUpload(ctx context.Context, bucket, object, uID } m.Meta = opts.UserDefined - m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} - if c.commitWriteback { - m.Meta[writeBackStatusHeader] = CommitPending.String() - } m.Stat.ModTime = UTCNow() if globalCacheKMS != nil { + m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = "" if _, err := newCacheEncryptMetadata(bucket, object, m.Meta); err != nil { return uploadID, err } diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index ca9b24497..fa3856f8a 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -368,7 +368,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string // use a new context to avoid locker prematurely timing out operation when the GetObjectNInfo returns. dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{ UserDefined: getMetadata(bReader.ObjInfo), - }, false) + }, false, false) return } }() @@ -386,7 +386,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string io.LimitReader(pr, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, rs, ObjectOptions{ UserDefined: userDefined, - }, false) + }, false, false) // close the read end of the pipe, so the error gets // propagated to teeReader pr.CloseWithError(putErr) @@ -678,31 +678,82 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * return putObjectFn(ctx, bucket, object, r, opts) } if c.commitWriteback { - oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false) + oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false, true) if err != nil { return ObjectInfo{}, err } go c.uploadObject(GlobalContext, oi) return oi, nil } - objInfo, err = putObjectFn(ctx, bucket, object, r, opts) - - if err == nil { - go func() { - // fill cache in the background - bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{}) - if bErr != nil { - return - } - defer bReader.Close() - oi, _, err := dcache.Stat(GlobalContext, bucket, object) - // avoid cache overwrite if another background routine filled cache - if err != nil || oi.ETag != bReader.ObjInfo.ETag { - dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false) - } - }() + if !c.commitWritethrough { + objInfo, err = putObjectFn(ctx, bucket, object, r, opts) + if err == nil { + go func() { + // fill cache in the background + bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{}) + if bErr != nil { + return + } + defer bReader.Close() + oi, _, err := dcache.Stat(GlobalContext, bucket, object) + // avoid cache overwrite if another background routine filled cache + if err != nil || oi.ETag != bReader.ObjInfo.ETag { + dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, true) + } + }() + } + return objInfo, err } - return objInfo, err + cLock, lkctx, cerr := dcache.GetLockContext(GlobalContext, bucket, object) + if cerr != nil { + return putObjectFn(ctx, bucket, object, r, opts) + } + defer cLock.Unlock(lkctx.Cancel) + // Initialize pipe to stream data to backend + pipeReader, pipeWriter := io.Pipe() + hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize()) + if err != nil { + return + } + // Initialize pipe to stream data to cache + rPipe, wPipe := io.Pipe() + infoCh := make(chan ObjectInfo) + errorCh := make(chan error) + go func() { + info, err := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader), opts) + if err != nil { + close(infoCh) + pipeReader.CloseWithError(err) + rPipe.CloseWithError(err) + errorCh <- err + return + } + close(errorCh) + infoCh <- info + }() + + go func() { + _, err := dcache.put(lkctx.Context(), bucket, object, rPipe, r.Size(), nil, opts, false, false) + if err != nil { + rPipe.CloseWithError(err) + return + } + }() + + mwriter := cacheMultiWriter(pipeWriter, wPipe) + _, err = io.Copy(mwriter, r) + pipeWriter.Close() + wPipe.Close() + + if err != nil { + err = <-errorCh + return ObjectInfo{}, err + } + info := <-infoCh + if cerr = dcache.updateMetadata(lkctx.Context(), bucket, object, info.ETag, info.ModTime, info.Size); cerr != nil { + dcache.delete(bucket, object) + } + return info, err } // upload cached object to backend in async commit mode. @@ -922,7 +973,7 @@ func (c *cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object st dcache.Delete(ctx, bucket, object) return newMultipartUploadFn(ctx, bucket, object, opts) } - if !c.commitWritethrough { + if !c.commitWritethrough && !c.commitWriteback { return newMultipartUploadFn(ctx, bucket, object, opts) } @@ -941,7 +992,7 @@ func (c *cacheObjects) PutObjectPart(ctx context.Context, bucket, object, upload return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts) } - if !c.commitWritethrough { + if !c.commitWritethrough && !c.commitWriteback { return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts) } if c.skipCache() { @@ -1039,7 +1090,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts) } - if !c.commitWritethrough { + if !c.commitWritethrough && !c.commitWriteback { return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts) } if err := dcache.uploadIDExists(dstBucket, dstObject, uploadID); err != nil { @@ -1077,7 +1128,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, // finalizes the upload saved in cache multipart dir. func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { completeMultipartUploadFn := c.InnerCompleteMultipartUploadFn - if !c.commitWritethrough { + if !c.commitWritethrough && !c.commitWriteback { return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts) } dcache, err := c.getCacheToLoc(ctx, bucket, object) @@ -1102,7 +1153,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje oi, _, err := dcache.Stat(GlobalContext, bucket, object) // avoid cache overwrite if another background routine filled cache if err != nil || oi.ETag != bReader.ObjInfo.ETag { - dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false) + dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, false) } } }() @@ -1113,7 +1164,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje // AbortMultipartUpload - aborts multipart upload on backend and cache. func (c *cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { abortMultipartUploadFn := c.InnerAbortMultipartUploadFn - if !c.commitWritethrough { + if !c.commitWritethrough && !c.commitWriteback { return abortMultipartUploadFn(ctx, bucket, object, uploadID, opts) } dcache, err := c.getCacheToLoc(ctx, bucket, object) diff --git a/docs/disk-caching/DESIGN.md b/docs/disk-caching/DESIGN.md index 7341f27dd..86c809235 100644 --- a/docs/disk-caching/DESIGN.md +++ b/docs/disk-caching/DESIGN.md @@ -93,13 +93,13 @@ master key to automatically encrypt all cached content. Note that cache KMS master key is not recommended for use in production deployments. If the MinIO server/gateway machine is ever compromised, the cache KMS master key must also be treated as compromised. Support for external KMS to manage cache KMS keys is on the roadmap,and would be ideal for production use cases. -- `MINIO_CACHE_COMMIT` setting of `writethrough` allows caching of multipart uploads synchronously if enabled. By default, single PUT operations are already cached on write without any special setting. +- `MINIO_CACHE_COMMIT` setting of `writethrough` allows caching of single and multipart uploads synchronously if enabled. By default, however single PUT operations are cached asynchronously on write without any special setting. - Partially cached stale uploads older than 24 hours are automatically cleaned up. - Expiration happens automatically based on the configured interval as explained above, frequently accessed objects stay alive in cache for a significantly longer time. -> NOTE: `MINIO_CACHE_COMMIT` also has a value of `writeback` which allows staging single uploads in cache before committing to remote. However, for consistency reasons, `writeback` staging uploads in the cache are not permitted for multipart uploads. +> NOTE: `MINIO_CACHE_COMMIT` also has a value of `writeback` which allows staging single uploads in cache before committing to remote. It is not possible to stage multipart uploads in the cache for consistency reasons - hence, multipart uploads will be cached synchronously even if `writeback` is set. ### Crash Recovery