mirror of
https://github.com/versity/versitygw.git
synced 2026-03-27 18:05:00 +00:00
Merge pull request #1899 from versity/sis/upload-part-crc64nvme
feat: optimize multipart upload checksum calculation.
This commit is contained in:
@@ -121,6 +121,7 @@ const (
|
||||
versioningKey = "versioning"
|
||||
deleteMarkerKey = "delete-marker"
|
||||
versionIdKey = "version-id"
|
||||
partCrc64nvme = "part-crc64nvme"
|
||||
|
||||
nullVersionId = "null"
|
||||
|
||||
@@ -1669,13 +1670,6 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
return res, "", fmt.Errorf("get mp checksums: %w", err)
|
||||
}
|
||||
|
||||
// The checksum algorithm should default to CRC64NVME
|
||||
// just for data integrity. It isn't going to be saved
|
||||
// in the final object metadata
|
||||
if checksums.Algorithm == "" {
|
||||
checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme
|
||||
}
|
||||
|
||||
// ChecksumType should be the same as specified on CreateMultipartUpload
|
||||
if input.ChecksumType != "" && checksums.Type != input.ChecksumType {
|
||||
checksumType := checksums.Type
|
||||
@@ -1686,23 +1680,19 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
|
||||
}
|
||||
|
||||
// mpChecksumType holds the multipart upload checksum type
|
||||
mpChecksumType := checksums.Type
|
||||
|
||||
// The checksum type should default to FULL_OBJECT(crc64nvme)
|
||||
// The checksum type/algorithm should default to FULL_OBJECT(crc64nvme)
|
||||
if checksums.Type == "" {
|
||||
checksums.Type = types.ChecksumTypeFullObject
|
||||
checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme
|
||||
}
|
||||
|
||||
// check all parts ok
|
||||
last := len(parts) - 1
|
||||
var totalsize int64
|
||||
|
||||
var composableCRC bool
|
||||
switch mpChecksumType {
|
||||
case types.ChecksumTypeFullObject:
|
||||
composableCRC = utils.IsChecksumComposable(checksums.Algorithm)
|
||||
}
|
||||
|
||||
// The initialie values is the lower limit of partNumber: 0
|
||||
var partNumber int32
|
||||
for i, part := range parts {
|
||||
@@ -1757,17 +1747,9 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
|
||||
}
|
||||
|
||||
var hashRdr *utils.HashReader
|
||||
var compositeChecksumRdr *utils.CompositeChecksumReader
|
||||
switch checksums.Type {
|
||||
case types.ChecksumTypeFullObject:
|
||||
if !composableCRC {
|
||||
hashRdr, err = utils.NewHashReader(nil, "", utils.HashType(strings.ToLower(string(checksums.Algorithm))))
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("initialize hash reader: %w", err)
|
||||
}
|
||||
}
|
||||
case types.ChecksumTypeComposite:
|
||||
if checksums.Type == types.ChecksumTypeComposite {
|
||||
// initialize the composite checksum reader if the mp checksum type is COMPOSITE
|
||||
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm))))
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
|
||||
@@ -1798,26 +1780,33 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
return res, "", fmt.Errorf("stat part %v: %v", *part.PartNumber, err)
|
||||
}
|
||||
|
||||
var rdr io.Reader = pf
|
||||
switch checksums.Type {
|
||||
case types.ChecksumTypeFullObject:
|
||||
if composableCRC {
|
||||
if i == 0 {
|
||||
composableCsum = getPartChecksum(checksums.Algorithm, part)
|
||||
break
|
||||
}
|
||||
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm,
|
||||
composableCsum, getPartChecksum(checksums.Algorithm, part),
|
||||
pfi.Size())
|
||||
var partChecksum string
|
||||
if mpChecksumType != "" {
|
||||
// if any checksum has been initially specifed on mp creation
|
||||
// read the part checksum configuration
|
||||
partChecksum = getPartChecksum(checksums.Algorithm, part)
|
||||
} else {
|
||||
// if no checksum has been specified on mp creation
|
||||
// retrieve the internally stored crc64nvme
|
||||
crc64nvme, err := p.meta.RetrieveAttribute(pf, bucket, partObjPath, partCrc64nvme)
|
||||
if err != nil {
|
||||
pf.Close()
|
||||
return res, "", fmt.Errorf("add part %v checksum: %w",
|
||||
*part.PartNumber, err)
|
||||
return res, "", fmt.Errorf("retrieve part internal crc64nvme: %s", err)
|
||||
}
|
||||
partChecksum = string(crc64nvme)
|
||||
}
|
||||
if i == 0 {
|
||||
composableCsum = partChecksum
|
||||
break
|
||||
}
|
||||
hashRdr.SetReader(rdr)
|
||||
rdr = hashRdr
|
||||
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, partChecksum, pfi.Size())
|
||||
if err != nil {
|
||||
pf.Close()
|
||||
return res, "", fmt.Errorf("add part %v checksum: %w",
|
||||
*part.PartNumber, err)
|
||||
}
|
||||
case types.ChecksumTypeComposite:
|
||||
err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part))
|
||||
if err != nil {
|
||||
@@ -1835,10 +1824,10 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
bucket, object, err)
|
||||
fw := f.File()
|
||||
fw.Seek(0, io.SeekEnd)
|
||||
_, err = io.Copy(fw, rdr)
|
||||
_, err = io.Copy(fw, pf)
|
||||
}
|
||||
} else {
|
||||
_, err = io.Copy(f.File(), rdr)
|
||||
_, err = io.Copy(f.File(), pf)
|
||||
}
|
||||
pf.Close()
|
||||
if err != nil {
|
||||
@@ -1937,11 +1926,7 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
case types.ChecksumTypeComposite:
|
||||
value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts))
|
||||
case types.ChecksumTypeFullObject:
|
||||
if !composableCRC {
|
||||
value = hashRdr.Sum()
|
||||
} else {
|
||||
value = composableCsum
|
||||
}
|
||||
value = composableCsum
|
||||
}
|
||||
|
||||
var gotSum *string
|
||||
@@ -2652,7 +2637,10 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart
|
||||
chRdr, chunkUpload := input.Body.(middlewares.ChecksumReader)
|
||||
isTrailingChecksum := chunkUpload && chRdr.Algorithm() != ""
|
||||
|
||||
var hashRdr *utils.HashReader
|
||||
// user input checksum algorithm: either with chunk uploads or with request headers
|
||||
var inputChAlgo utils.HashType
|
||||
// user input checksum value specifed with request headers
|
||||
var inputSum string
|
||||
|
||||
if !isTrailingChecksum {
|
||||
hashConfigs := []hashConfig{
|
||||
@@ -2665,27 +2653,20 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart
|
||||
|
||||
for _, config := range hashConfigs {
|
||||
if config.value != nil {
|
||||
hashRdr, err = utils.NewHashReader(tr, *config.value, config.hashType)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize hash reader: %w", err)
|
||||
}
|
||||
|
||||
tr = hashRdr
|
||||
inputChAlgo = config.hashType
|
||||
inputSum = *config.value
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checksums, chErr := p.retrieveChecksums(nil, bucket, mpPath)
|
||||
if chErr != nil && !errors.Is(chErr, meta.ErrNoSuchKey) {
|
||||
return nil, fmt.Errorf("retreive mp checksum: %w", chErr)
|
||||
}
|
||||
|
||||
var inputChAlgo utils.HashType
|
||||
if isTrailingChecksum {
|
||||
} else {
|
||||
inputChAlgo = utils.HashType(chRdr.Algorithm())
|
||||
}
|
||||
if hashRdr != nil {
|
||||
inputChAlgo = hashRdr.Type()
|
||||
|
||||
exposeChecksum := inputChAlgo != ""
|
||||
|
||||
checksums, err := p.retrieveChecksums(nil, bucket, mpPath)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return nil, fmt.Errorf("retreive mp checksum: %w", err)
|
||||
}
|
||||
|
||||
// If checksum isn't provided for the part,
|
||||
@@ -2704,18 +2685,53 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart
|
||||
}
|
||||
}
|
||||
|
||||
// if no checksum algorithm or precalculated checksum is
|
||||
// provided, but one has been on multipart upload initialization,
|
||||
// anyways calculate and store the uploaded part checksum
|
||||
if inputChAlgo == "" && checksums.Algorithm != "" {
|
||||
hashType := utils.HashType(strings.ToLower(string(checksums.Algorithm)))
|
||||
hashRdr, err = utils.NewHashReader(tr, "", hashType)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize hash reader: %w", err)
|
||||
}
|
||||
inputChAlgo = hashType
|
||||
if inputChAlgo == "" {
|
||||
// default to crc64nvme
|
||||
inputChAlgo = utils.HashTypeCRC64NVME
|
||||
}
|
||||
|
||||
tr = hashRdr
|
||||
// hashreader is responsible to calculate and validate
|
||||
// user input checksums
|
||||
var hashRdr *utils.HashReader
|
||||
// crc64nvmeRdr is used to calculate the object crc64nvme
|
||||
// only for internal usage
|
||||
var crc64nvmeRdr *utils.HashReader
|
||||
|
||||
if checksums.Type == "" {
|
||||
if inputChAlgo != utils.HashTypeCRC64NVME {
|
||||
// if the input checksum algorithm isn't crc64nvme, create a
|
||||
// crc64nvme reader no matter if checksum is
|
||||
// received from chunk reader or request headers.
|
||||
crc64nvmeRdr, err = utils.NewHashReader(tr, "", utils.HashTypeCRC64NVME)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize crc64nvme hash reader: %w", err)
|
||||
}
|
||||
tr = crc64nvmeRdr
|
||||
}
|
||||
|
||||
if !isTrailingChecksum {
|
||||
// create a new hash reader for the user input checksum for calculation
|
||||
// if the checksum doesn't come from chunk readers
|
||||
hashRdr, err = utils.NewHashReader(tr, inputSum, inputChAlgo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize hash reader: %w", err)
|
||||
}
|
||||
|
||||
tr = hashRdr
|
||||
}
|
||||
} else {
|
||||
// if no checksum algorithm or precalculated checksum is
|
||||
// provided, but one has been on multipart upload initialization,
|
||||
// anyways calculate and store the uploaded part checksum
|
||||
if !isTrailingChecksum {
|
||||
chAlgo := utils.HashType(strings.ToLower(string(checksums.Algorithm)))
|
||||
hashRdr, err = utils.NewHashReader(tr, inputSum, chAlgo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initialize hash reader: %w", err)
|
||||
}
|
||||
|
||||
tr = hashRdr
|
||||
}
|
||||
}
|
||||
|
||||
_, err = io.Copy(f, tr)
|
||||
@@ -2740,9 +2756,14 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart
|
||||
ETag: &etag,
|
||||
}
|
||||
|
||||
if inputChAlgo != "" {
|
||||
// if a checksum algorithm has been provided on mp initiation
|
||||
// the checksums should be stored, otherwise only returned
|
||||
// in the response without storing
|
||||
storeChecksum := checksums.Type != ""
|
||||
|
||||
if storeChecksum {
|
||||
checksum := s3response.Checksum{
|
||||
Algorithm: input.ChecksumAlgorithm,
|
||||
Algorithm: checksums.Algorithm,
|
||||
}
|
||||
|
||||
var sum string
|
||||
@@ -2754,30 +2775,64 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart
|
||||
}
|
||||
|
||||
// Assign the checksum
|
||||
switch inputChAlgo {
|
||||
case utils.HashTypeCRC32:
|
||||
switch checksums.Algorithm {
|
||||
case types.ChecksumAlgorithmCrc32:
|
||||
checksum.CRC32 = &sum
|
||||
res.ChecksumCRC32 = &sum
|
||||
case utils.HashTypeCRC32C:
|
||||
case types.ChecksumAlgorithmCrc32c:
|
||||
checksum.CRC32C = &sum
|
||||
res.ChecksumCRC32C = &sum
|
||||
case utils.HashTypeSha1:
|
||||
case types.ChecksumAlgorithmSha1:
|
||||
checksum.SHA1 = &sum
|
||||
res.ChecksumSHA1 = &sum
|
||||
case utils.HashTypeSha256:
|
||||
case types.ChecksumAlgorithmSha256:
|
||||
checksum.SHA256 = &sum
|
||||
res.ChecksumSHA256 = &sum
|
||||
case utils.HashTypeCRC64NVME:
|
||||
case types.ChecksumAlgorithmCrc64nvme:
|
||||
checksum.CRC64NVME = &sum
|
||||
res.ChecksumCRC64NVME = &sum
|
||||
}
|
||||
|
||||
// Store the checksums if the checksum type has been
|
||||
// specified on mp initialization
|
||||
if checksums.Type != "" {
|
||||
err := p.storeChecksums(f.File(), bucket, partPath, checksum)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store checksum: %w", err)
|
||||
err := p.storeChecksums(f.File(), bucket, partPath, checksum)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store checksum: %w", err)
|
||||
}
|
||||
} else {
|
||||
var internalCrc64NvmeSum string
|
||||
if inputChAlgo == utils.HashTypeCRC64NVME {
|
||||
if isTrailingChecksum {
|
||||
internalCrc64NvmeSum = chRdr.Checksum()
|
||||
} else {
|
||||
internalCrc64NvmeSum = hashRdr.Sum()
|
||||
}
|
||||
} else {
|
||||
internalCrc64NvmeSum = crc64nvmeRdr.Sum()
|
||||
}
|
||||
|
||||
err := p.meta.StoreAttribute(f.File(), bucket, partPath, partCrc64nvme, []byte(internalCrc64NvmeSum))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store part internal crc64nvme: %w", err)
|
||||
}
|
||||
|
||||
if exposeChecksum {
|
||||
var sumToReturn string
|
||||
if isTrailingChecksum {
|
||||
sumToReturn = chRdr.Checksum()
|
||||
} else {
|
||||
sumToReturn = hashRdr.Sum()
|
||||
}
|
||||
|
||||
switch inputChAlgo {
|
||||
case utils.HashTypeCRC32:
|
||||
res.ChecksumCRC32 = &sumToReturn
|
||||
case utils.HashTypeCRC32C:
|
||||
res.ChecksumCRC32C = &sumToReturn
|
||||
case utils.HashTypeSha1:
|
||||
res.ChecksumSHA1 = &sumToReturn
|
||||
case utils.HashTypeSha256:
|
||||
res.ChecksumSHA256 = &sumToReturn
|
||||
case utils.HashTypeCRC64NVME:
|
||||
res.ChecksumCRC64NVME = &sumToReturn
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2948,6 +3003,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
|
||||
// TODO: Should the checksum be recalculated or just copied ?
|
||||
var hashRdr *utils.HashReader
|
||||
var crc64nvmeRdr *utils.HashReader
|
||||
if mpChecksums.Algorithm != "" {
|
||||
if checksums.Algorithm == "" || mpChecksums.Algorithm != checksums.Algorithm {
|
||||
hashRdr, err = utils.NewHashReader(tr, "", utils.HashType(strings.ToLower(string(mpChecksums.Algorithm))))
|
||||
@@ -2957,6 +3013,14 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
|
||||
tr = hashRdr
|
||||
}
|
||||
} else {
|
||||
// if not checksum has been specifed on multipart upload initiation
|
||||
// create an internal crc64nvme reader to calculate and stored the internal crc64nvme
|
||||
crc64nvmeRdr, err = utils.NewHashReader(tr, "", utils.HashTypeCRC64NVME)
|
||||
if err != nil {
|
||||
return s3response.CopyPartResult{}, fmt.Errorf("initialize internal crc64nvme reader: %w", err)
|
||||
}
|
||||
tr = crc64nvmeRdr
|
||||
}
|
||||
|
||||
_, err = io.Copy(f, tr)
|
||||
@@ -2979,6 +3043,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hashRdr != nil {
|
||||
algo := types.ChecksumAlgorithm(strings.ToUpper(string(hashRdr.Type())))
|
||||
checksums = s3response.Checksum{
|
||||
@@ -3005,6 +3070,15 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
}
|
||||
}
|
||||
|
||||
if crc64nvmeRdr != nil {
|
||||
// store the internal crc64nvme
|
||||
internalCrc64NvmeSum := crc64nvmeRdr.Sum()
|
||||
err := p.meta.StoreAttribute(f.File(), objPath, "", partCrc64nvme, []byte(internalCrc64NvmeSum))
|
||||
if err != nil {
|
||||
return s3response.CopyPartResult{}, fmt.Errorf("store part internal crc64nvme: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
etag := backend.GenerateEtag(hash)
|
||||
err = p.meta.StoreAttribute(f.File(), *upi.Bucket, partPath, etagkey, []byte(etag))
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user