From d03a33110d9828788e94002a061d6db63b7574f7 Mon Sep 17 00:00:00 2001 From: niksis02 Date: Thu, 26 Feb 2026 19:14:30 +0400 Subject: [PATCH] feat: optimize multipart upload checksum calculation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR optimizes multipart upload checksum handling. When a checksum algorithm/type is specified at multipart-upload initiation, each `UploadPart` request computes, validates, and stores the corresponding part checksum. During `CompleteMultipartUpload`, the final checksum is derived either via composite checksum calculation or by composing the CRC-family checksums. When **no** checksum algorithm is specified during multipart-upload initiation, each `UploadPart` may supply a different checksum algorithm for data-integrity verification. To support this scenario, a new mechanism has been implemented: for every `UploadPart`, a **crc64nvme** checksum is always computed. * If the client uses crc64nvme for the part upload, a single hash reader is used. * Otherwise, two hash readers are used—one for crc64nvme and one for the user-provided checksum. The crc64nvme value is stored in part xattrs under `user.part-crc64nvme` and later used during `CompleteMultipartUpload` as a composable checksum source. In `CompleteMultipartUpload`, the hash reader is entirely removed; the gateway no longer re-reads part data to compute the final checksum. The logic now follows two distinct paths: 1. **Checksum algorithm/type specified at MP initiation** * All required per-part checksums have already been stored. * If the checksum type is `FULL_OBJECT`, the gateway uses the composable path. * If the type is `COMPOSITE`, the gateway follows the checksum-combining path. 2. **No checksum algorithm specified at MP initiation** * The gateway loads the stored per-part `crc64nvme` values and composes them to compute the final checksum. The previous `composableCRC` check has been removed because all `FULL_OBJECT` algorithms are inherently composable (`crc32`, `crc32c`, `crc64nvme`). Validation now relies solely on `checksum.Type`. --- backend/posix/posix.go | 248 ++++++++++++++++++++++++++--------------- 1 file changed, 161 insertions(+), 87 deletions(-) diff --git a/backend/posix/posix.go b/backend/posix/posix.go index d05e18c2..00a9d75d 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -121,6 +121,7 @@ const ( versioningKey = "versioning" deleteMarkerKey = "delete-marker" versionIdKey = "version-id" + partCrc64nvme = "part-crc64nvme" nullVersionId = "null" @@ -1658,13 +1659,6 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C return res, "", fmt.Errorf("get mp checksums: %w", err) } - // The checksum algorithm should default to CRC64NVME - // just for data integrity. It isn't going to be saved - // in the final object metadata - if checksums.Algorithm == "" { - checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme - } - // ChecksumType should be the same as specified on CreateMultipartUpload if input.ChecksumType != "" && checksums.Type != input.ChecksumType { checksumType := checksums.Type @@ -1675,23 +1669,19 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType) } + // mpChecksumType holds the multipart upload checksum type mpChecksumType := checksums.Type - // The checksum type should default to FULL_OBJECT(crc64nvme) + // The checksum type/algorithm should default to FULL_OBJECT(crc64nvme) if checksums.Type == "" { checksums.Type = types.ChecksumTypeFullObject + checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme } // check all parts ok last := len(parts) - 1 var totalsize int64 - var composableCRC bool - switch mpChecksumType { - case types.ChecksumTypeFullObject: - composableCRC = utils.IsChecksumComposable(checksums.Algorithm) - } - // The initialie values is the lower limit of partNumber: 0 var partNumber int32 for i, part := range parts { @@ -1746,17 +1736,9 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize) } - var hashRdr *utils.HashReader var compositeChecksumRdr *utils.CompositeChecksumReader - switch checksums.Type { - case types.ChecksumTypeFullObject: - if !composableCRC { - hashRdr, err = utils.NewHashReader(nil, "", utils.HashType(strings.ToLower(string(checksums.Algorithm)))) - if err != nil { - return res, "", fmt.Errorf("initialize hash reader: %w", err) - } - } - case types.ChecksumTypeComposite: + if checksums.Type == types.ChecksumTypeComposite { + // initialize the composite checksum reader if the mp checksum type is COMPOSITE compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm)))) if err != nil { return res, "", fmt.Errorf("initialize composite checksum reader: %w", err) @@ -1787,26 +1769,33 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C return res, "", fmt.Errorf("stat part %v: %v", *part.PartNumber, err) } - var rdr io.Reader = pf switch checksums.Type { case types.ChecksumTypeFullObject: - if composableCRC { - if i == 0 { - composableCsum = getPartChecksum(checksums.Algorithm, part) - break - } - composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, - composableCsum, getPartChecksum(checksums.Algorithm, part), - pfi.Size()) + var partChecksum string + if mpChecksumType != "" { + // if any checksum has been initially specifed on mp creation + // read the part checksum configuration + partChecksum = getPartChecksum(checksums.Algorithm, part) + } else { + // if no checksum has been specified on mp creation + // retrieve the internally stored crc64nvme + crc64nvme, err := p.meta.RetrieveAttribute(pf, bucket, partObjPath, partCrc64nvme) if err != nil { pf.Close() - return res, "", fmt.Errorf("add part %v checksum: %w", - *part.PartNumber, err) + return res, "", fmt.Errorf("retrieve part internal crc64nvme: %s", err) } + partChecksum = string(crc64nvme) + } + if i == 0 { + composableCsum = partChecksum break } - hashRdr.SetReader(rdr) - rdr = hashRdr + composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, partChecksum, pfi.Size()) + if err != nil { + pf.Close() + return res, "", fmt.Errorf("add part %v checksum: %w", + *part.PartNumber, err) + } case types.ChecksumTypeComposite: err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)) if err != nil { @@ -1824,10 +1813,10 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C bucket, object, err) fw := f.File() fw.Seek(0, io.SeekEnd) - _, err = io.Copy(fw, rdr) + _, err = io.Copy(fw, pf) } } else { - _, err = io.Copy(f.File(), rdr) + _, err = io.Copy(f.File(), pf) } pf.Close() if err != nil { @@ -1926,11 +1915,7 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C case types.ChecksumTypeComposite: value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts)) case types.ChecksumTypeFullObject: - if !composableCRC { - value = hashRdr.Sum() - } else { - value = composableCsum - } + value = composableCsum } var gotSum *string @@ -2641,7 +2626,10 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart chRdr, chunkUpload := input.Body.(middlewares.ChecksumReader) isTrailingChecksum := chunkUpload && chRdr.Algorithm() != "" - var hashRdr *utils.HashReader + // user input checksum algorithm: either with chunk uploads or with request headers + var inputChAlgo utils.HashType + // user input checksum value specifed with request headers + var inputSum string if !isTrailingChecksum { hashConfigs := []hashConfig{ @@ -2654,27 +2642,20 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart for _, config := range hashConfigs { if config.value != nil { - hashRdr, err = utils.NewHashReader(tr, *config.value, config.hashType) - if err != nil { - return nil, fmt.Errorf("initialize hash reader: %w", err) - } - - tr = hashRdr + inputChAlgo = config.hashType + inputSum = *config.value + break } } - } - - checksums, chErr := p.retrieveChecksums(nil, bucket, mpPath) - if chErr != nil && !errors.Is(chErr, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("retreive mp checksum: %w", chErr) - } - - var inputChAlgo utils.HashType - if isTrailingChecksum { + } else { inputChAlgo = utils.HashType(chRdr.Algorithm()) } - if hashRdr != nil { - inputChAlgo = hashRdr.Type() + + exposeChecksum := inputChAlgo != "" + + checksums, err := p.retrieveChecksums(nil, bucket, mpPath) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("retreive mp checksum: %w", err) } // If checksum isn't provided for the part, @@ -2693,18 +2674,53 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart } } - // if no checksum algorithm or precalculated checksum is - // provided, but one has been on multipart upload initialization, - // anyways calculate and store the uploaded part checksum - if inputChAlgo == "" && checksums.Algorithm != "" { - hashType := utils.HashType(strings.ToLower(string(checksums.Algorithm))) - hashRdr, err = utils.NewHashReader(tr, "", hashType) - if err != nil { - return nil, fmt.Errorf("initialize hash reader: %w", err) - } - inputChAlgo = hashType + if inputChAlgo == "" { + // default to crc64nvme + inputChAlgo = utils.HashTypeCRC64NVME + } - tr = hashRdr + // hashreader is responsible to calculate and validate + // user input checksums + var hashRdr *utils.HashReader + // crc64nvmeRdr is used to calculate the object crc64nvme + // only for internal usage + var crc64nvmeRdr *utils.HashReader + + if checksums.Type == "" { + if inputChAlgo != utils.HashTypeCRC64NVME { + // if the input checksum algorithm isn't crc64nvme, create a + // crc64nvme reader no matter if checksum is + // received from chunk reader or request headers. + crc64nvmeRdr, err = utils.NewHashReader(tr, "", utils.HashTypeCRC64NVME) + if err != nil { + return nil, fmt.Errorf("initialize crc64nvme hash reader: %w", err) + } + tr = crc64nvmeRdr + } + + if !isTrailingChecksum { + // create a new hash reader for the user input checksum for calculation + // if the checksum doesn't come from chunk readers + hashRdr, err = utils.NewHashReader(tr, inputSum, inputChAlgo) + if err != nil { + return nil, fmt.Errorf("initialize hash reader: %w", err) + } + + tr = hashRdr + } + } else { + // if no checksum algorithm or precalculated checksum is + // provided, but one has been on multipart upload initialization, + // anyways calculate and store the uploaded part checksum + if !isTrailingChecksum { + chAlgo := utils.HashType(strings.ToLower(string(checksums.Algorithm))) + hashRdr, err = utils.NewHashReader(tr, inputSum, chAlgo) + if err != nil { + return nil, fmt.Errorf("initialize hash reader: %w", err) + } + + tr = hashRdr + } } _, err = io.Copy(f, tr) @@ -2729,9 +2745,14 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart ETag: &etag, } - if inputChAlgo != "" { + // if a checksum algorithm has been provided on mp initiation + // the checksums should be stored, otherwise only returned + // in the response without storing + storeChecksum := checksums.Type != "" + + if storeChecksum { checksum := s3response.Checksum{ - Algorithm: input.ChecksumAlgorithm, + Algorithm: checksums.Algorithm, } var sum string @@ -2743,30 +2764,64 @@ func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPart } // Assign the checksum - switch inputChAlgo { - case utils.HashTypeCRC32: + switch checksums.Algorithm { + case types.ChecksumAlgorithmCrc32: checksum.CRC32 = &sum res.ChecksumCRC32 = &sum - case utils.HashTypeCRC32C: + case types.ChecksumAlgorithmCrc32c: checksum.CRC32C = &sum res.ChecksumCRC32C = &sum - case utils.HashTypeSha1: + case types.ChecksumAlgorithmSha1: checksum.SHA1 = &sum res.ChecksumSHA1 = &sum - case utils.HashTypeSha256: + case types.ChecksumAlgorithmSha256: checksum.SHA256 = &sum res.ChecksumSHA256 = &sum - case utils.HashTypeCRC64NVME: + case types.ChecksumAlgorithmCrc64nvme: checksum.CRC64NVME = &sum res.ChecksumCRC64NVME = &sum } - // Store the checksums if the checksum type has been - // specified on mp initialization - if checksums.Type != "" { - err := p.storeChecksums(f.File(), bucket, partPath, checksum) - if err != nil { - return nil, fmt.Errorf("store checksum: %w", err) + err := p.storeChecksums(f.File(), bucket, partPath, checksum) + if err != nil { + return nil, fmt.Errorf("store checksum: %w", err) + } + } else { + var internalCrc64NvmeSum string + if inputChAlgo == utils.HashTypeCRC64NVME { + if isTrailingChecksum { + internalCrc64NvmeSum = chRdr.Checksum() + } else { + internalCrc64NvmeSum = hashRdr.Sum() + } + } else { + internalCrc64NvmeSum = crc64nvmeRdr.Sum() + } + + err := p.meta.StoreAttribute(f.File(), bucket, partPath, partCrc64nvme, []byte(internalCrc64NvmeSum)) + if err != nil { + return nil, fmt.Errorf("store part internal crc64nvme: %w", err) + } + + if exposeChecksum { + var sumToReturn string + if isTrailingChecksum { + sumToReturn = chRdr.Checksum() + } else { + sumToReturn = hashRdr.Sum() + } + + switch inputChAlgo { + case utils.HashTypeCRC32: + res.ChecksumCRC32 = &sumToReturn + case utils.HashTypeCRC32C: + res.ChecksumCRC32C = &sumToReturn + case utils.HashTypeSha1: + res.ChecksumSHA1 = &sumToReturn + case utils.HashTypeSha256: + res.ChecksumSHA256 = &sumToReturn + case utils.HashTypeCRC64NVME: + res.ChecksumCRC64NVME = &sumToReturn } } } @@ -2937,6 +2992,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) // TODO: Should the checksum be recalculated or just copied ? var hashRdr *utils.HashReader + var crc64nvmeRdr *utils.HashReader if mpChecksums.Algorithm != "" { if checksums.Algorithm == "" || mpChecksums.Algorithm != checksums.Algorithm { hashRdr, err = utils.NewHashReader(tr, "", utils.HashType(strings.ToLower(string(mpChecksums.Algorithm)))) @@ -2946,6 +3002,14 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) tr = hashRdr } + } else { + // if not checksum has been specifed on multipart upload initiation + // create an internal crc64nvme reader to calculate and stored the internal crc64nvme + crc64nvmeRdr, err = utils.NewHashReader(tr, "", utils.HashTypeCRC64NVME) + if err != nil { + return s3response.CopyPartResult{}, fmt.Errorf("initialize internal crc64nvme reader: %w", err) + } + tr = crc64nvmeRdr } _, err = io.Copy(f, tr) @@ -2968,6 +3032,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) } } } + if hashRdr != nil { algo := types.ChecksumAlgorithm(strings.ToUpper(string(hashRdr.Type()))) checksums = s3response.Checksum{ @@ -2994,6 +3059,15 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) } } + if crc64nvmeRdr != nil { + // store the internal crc64nvme + internalCrc64NvmeSum := crc64nvmeRdr.Sum() + err := p.meta.StoreAttribute(f.File(), objPath, "", partCrc64nvme, []byte(internalCrc64NvmeSum)) + if err != nil { + return s3response.CopyPartResult{}, fmt.Errorf("store part internal crc64nvme: %w", err) + } + } + etag := backend.GenerateEtag(hash) err = p.meta.StoreAttribute(f.File(), *upi.Bucket, partPath, etagkey, []byte(etag)) if err != nil {