From 64d4da5a37486fadd648ed923f94c71a03d52c1b Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 14 Jan 2022 10:01:25 -0800 Subject: [PATCH] Add Put input readahead (#14084) When reading input for PutObject or PutObjectPart add a readahead buffer for big inputs. This will make network reads+hashing separate run async with erasure coding and writes. This will reduce overall latency in distributed setups where the input is from upstream and writes go to other servers. We will read at 2 buffers ahead, meaning one will always be ready/waiting and one is currently being read from. This improves PutObject and PutObjectParts for these cases. --- cmd/erasure-multipart.go | 18 +++++++++++++++++- cmd/erasure-object.go | 17 ++++++++++++++++- cmd/xl-storage.go | 3 +++ go.mod | 2 +- go.sum | 4 ++-- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 50b981681..c3899d51f 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/klauspost/readahead" "github.com/minio/minio-go/v7/pkg/set" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" @@ -568,7 +569,22 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } - n, err := erasure.Encode(pctx, data, writers, buffer, writeQuorum) + toEncode := io.Reader(data) + if data.Size() > bigFileThreshold { + // Add input readahead. + // We use 2 buffers, so we always have a full buffer of input. + bufA := er.bp.Get() + bufB := er.bp.Get() + defer er.bp.Put(bufA) + defer er.bp.Put(bufB) + ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) + if err == nil { + toEncode = ra + defer ra.Close() + } + } + + n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 615b3bdc9..9fb548711 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -29,6 +29,7 @@ import ( "strings" "sync" + "github.com/klauspost/readahead" "github.com/minio/madmin-go" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/bucket/lifecycle" @@ -884,7 +885,21 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize()) } - n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum) + toEncode := io.Reader(data) + if data.Size() > bigFileThreshold { + // We use 2 buffers, so we always have a full buffer of input. + bufA := er.bp.Get() + bufB := er.bp.Get() + defer er.bp.Put(bufA) + defer er.bp.Put(bufB) + ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) + if err == nil { + toEncode = ra + defer ra.Close() + } + logger.LogIf(ctx, err) + } + n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum) closeBitrotWriters(writers) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 9ea010248..34a1e5d9d 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -60,6 +60,9 @@ const ( // For hardrives it is possible to set this to a lower value to avoid any // spike in latency. But currently we are simply keeping it optimal for SSDs. + // bigFileThreshold is the point where we add readahead to put operations. + bigFileThreshold = 128 * humanize.MiByte + // XL metadata file carries per object metadata. xlStorageFormatFile = "xl.meta" ) diff --git a/go.mod b/go.mod index dacde7f00..13c7229f1 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/klauspost/compress v1.13.6 github.com/klauspost/cpuid/v2 v2.0.9 github.com/klauspost/pgzip v1.2.5 - github.com/klauspost/readahead v1.3.1 + github.com/klauspost/readahead v1.4.0 github.com/klauspost/reedsolomon v1.9.15 github.com/lib/pq v1.9.0 github.com/miekg/dns v1.1.43 diff --git a/go.sum b/go.sum index 8b184fddf..824f17ae8 100644 --- a/go.sum +++ b/go.sum @@ -955,8 +955,8 @@ github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBF github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= -github.com/klauspost/readahead v1.3.1 h1:QqXNYvm+VvqYcbrRT4LojUciM0XrznFRIDrbHiJtu/0= -github.com/klauspost/readahead v1.3.1/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= +github.com/klauspost/readahead v1.4.0 h1:w4hQ3BpdLjBnRQkZyNi+nwdHU7eGP9buTexWK9lU7gY= +github.com/klauspost/readahead v1.4.0/go.mod h1:7bolpMKhT5LKskLwYXGSDOyA2TYtMFgdgV0Y8gy7QhA= github.com/klauspost/reedsolomon v1.9.15 h1:g2erWKD2M6rgnPf89fCji6jNlhMKMdXcuNHMW1SYCIo= github.com/klauspost/reedsolomon v1.9.15/go.mod h1:eqPAcE7xar5CIzcdfwydOEdcmchAKAP/qs14y4GCBOk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=