diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 2126de8f2..044e27a41 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -1476,7 +1476,7 @@ func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) { // TODO: support custom storage class for remote replication - putOpts, err = putReplicationOpts(ctx, "", objInfo) + putOpts, err = putReplicationOpts(ctx, "", objInfo, 0) if err != nil { return putOpts, err } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 5ec05281b..db214a0ef 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -763,12 +763,32 @@ func (m caseInsensitiveMap) Lookup(key string) (string, bool) { return "", false } -func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, err error) { +func getCRCMeta(oi ObjectInfo, partNum int) map[string]string { + meta := make(map[string]string) + cs := oi.decryptChecksums(partNum) + for k, v := range cs { + cksum := hash.NewChecksumString(k, v) + if cksum == nil { + continue + } + if cksum.Valid() { + meta[cksum.Type.Key()] = v + } + } + return meta +} + +func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, partNum int) (putOpts minio.PutObjectOptions, err error) { meta := make(map[string]string) for k, v := range objInfo.UserDefined { // In case of SSE-C objects copy the allowed internal headers as well if !crypto.SSEC.IsEncrypted(objInfo.UserDefined) || !slices.Contains(maps.Keys(validSSEReplicationHeaders), k) { if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) { + if strings.EqualFold(k, ReservedMetadataPrefixLower+"crc") { + for k, v := range getCRCMeta(objInfo, partNum) { + meta[k] = v + } + } continue } if isStandardHeader(k) { @@ -782,6 +802,12 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (put } } + if len(objInfo.Checksum) > 0 { + for k, v := range getCRCMeta(objInfo, 0) { + meta[k] = v + } + } + if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) { sc = objInfo.StorageClass } @@ -1239,7 +1265,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj // use core client to avoid doing multipart on PUT c := &minio.Core{Client: tgt.Client} - putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo) + putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0) if err != nil { replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err)) sendEvent(eventArgs{ @@ -1506,7 +1532,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } } else { var putOpts minio.PutObjectOptions - putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo) + putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0) if err != nil { replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err)) sendEvent(eventArgs{ @@ -1614,6 +1640,10 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob cHeader := http.Header{} cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true") + crc := getCRCMeta(objInfo, partInfo.Number) + for k, v := range crc { + cHeader.Add(k, v) + } popts := minio.PutObjectPartOptions{ SSE: opts.ServerSideEncryption, CustomHeader: cHeader, @@ -1633,8 +1663,12 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize) } uploadedParts = append(uploadedParts, minio.CompletePart{ - PartNumber: pInfo.PartNumber, - ETag: pInfo.ETag, + PartNumber: pInfo.PartNumber, + ETag: pInfo.ETag, + ChecksumCRC32: pInfo.ChecksumCRC32, + ChecksumCRC32C: pInfo.ChecksumCRC32C, + ChecksumSHA1: pInfo.ChecksumSHA1, + ChecksumSHA256: pInfo.ChecksumSHA256, }) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 6700e2ae1..d861515c7 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1296,7 +1296,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str if checksumType.IsSet() { checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart - cs := hash.NewChecksumFromData(checksumType, checksumCombined) + var cs *hash.Checksum + cs = hash.NewChecksumFromData(checksumType, checksumCombined) fi.Checksum = cs.AppendTo(nil, checksumCombined) if opts.EncryptFn != nil { fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum) diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index ed6d3d56e..d80ed5ca7 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -773,6 +773,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) _, replicationStatus := mi.UserDefined[xhttp.AmzBucketReplicationStatus] + _, sourceReplReq := r.Header[xhttp.MinIOSourceReplicationRequest] var objectEncryptionKey crypto.ObjectKey if isEncrypted { if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) && !replicationStatus { @@ -795,7 +796,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } } - _, sourceReplReq := r.Header[xhttp.MinIOSourceReplicationRequest] if !(sourceReplReq && crypto.SSEC.IsEncrypted(mi.UserDefined)) { // Calculating object encryption key key, err = decryptObjectMeta(key, bucket, object, mi.UserDefined) @@ -847,6 +847,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } opts.IndexCB = idxCb + opts.ReplicationRequest = sourceReplReq putObjectPart := objectAPI.PutObjectPart partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)