diff --git a/cmd/api-response.go b/cmd/api-response.go index db93d30d6..cb279cb09 100644 --- a/cmd/api-response.go +++ b/cmd/api-response.go @@ -791,7 +791,7 @@ func generateInitiateMultipartUploadResponse(bucket, key, uploadID string) Initi // generates CompleteMultipartUploadResponse for given bucket, key, location and ETag. func generateCompleteMultipartUploadResponse(bucket, key, location string, oi ObjectInfo, h http.Header) CompleteMultipartUploadResponse { - cs := oi.decryptChecksums(0, h) + cs, _ := oi.decryptChecksums(0, h) c := CompleteMultipartUploadResponse{ Location: location, Bucket: bucket, diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 64d255f10..5083e99f3 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -622,12 +622,12 @@ func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLa }, } - opts, err := batchReplicationOpts(ctx, "", gr.ObjInfo) + opts, _, err := batchReplicationOpts(ctx, "", gr.ObjInfo) if err != nil { batchLogIf(ctx, err) continue } - + // TODO: I am not sure we read it back, but we aren't sending whether checksums are single/multipart. for k, vals := range opts.Header() { for _, v := range vals { snowballObj.Headers.Add(k, v) @@ -712,14 +712,14 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL return err } - putOpts, err := batchReplicationOpts(ctx, "", objInfo) + putOpts, isMP, err := batchReplicationOpts(ctx, "", objInfo) if err != nil { return err } if r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 { putOpts.Internal = miniogo.AdvancedPutOptions{} } - if objInfo.isMultipart() { + if isMP { if err := replicateObjectWithMultipart(ctx, c, tgtBucket, pathJoin(tgtPrefix, objInfo.Name), rd, objInfo, putOpts); err != nil { return err } @@ -1576,11 +1576,11 @@ func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string return err } -func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) { +func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, isMP bool, err error) { // TODO: support custom storage class for remote replication - putOpts, err = putReplicationOpts(ctx, "", objInfo, 0) + putOpts, isMP, err = putReplicationOpts(ctx, "", objInfo) if err != nil { - return putOpts, err + return putOpts, isMP, err } putOpts.Internal = miniogo.AdvancedPutOptions{ SourceVersionID: objInfo.VersionID, @@ -1588,7 +1588,7 @@ func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (p SourceETag: objInfo.ETag, ReplicationRequest: true, } - return putOpts, nil + return putOpts, isMP, nil } // ListBatchJobs - lists all currently active batch jobs, optionally takes {jobType} diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index d909ff40c..24fc00fe4 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -773,7 +773,7 @@ func (m caseInsensitiveMap) Lookup(key string) (string, bool) { return "", false } -func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, partNum int) (putOpts minio.PutObjectOptions, err error) { +func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, isMP bool, err error) { meta := make(map[string]string) isSSEC := crypto.SSEC.IsEncrypted(objInfo.UserDefined) @@ -794,23 +794,22 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part meta[k] = v } } + isMP = objInfo.isMultipart() if len(objInfo.Checksum) > 0 { // Add encrypted CRC to metadata for SSE-C objects. if isSSEC { meta[ReplicationSsecChecksumHeader] = base64.StdEncoding.EncodeToString(objInfo.Checksum) } else { - if objInfo.isMultipart() && partNum > 0 { - for _, pi := range objInfo.Parts { - if pi.Number == partNum { - for k, v := range pi.Checksums { // for PutObjectPart - meta[k] = v - } - } - } - } else { - for k, v := range getCRCMeta(objInfo, 0, nil) { // for PutObject/NewMultipartUpload - meta[k] = v - } + cs, mp := getCRCMeta(objInfo, 0, nil) + // Set object checksum. + for k, v := range cs { + meta[k] = v + } + isMP = mp + if !objInfo.isMultipart() && cs[xhttp.AmzChecksumType] == xhttp.AmzChecksumTypeFullObject { + // For objects where checksum is full object, it will be the same. + // Therefore, we use the cheaper PutObject replication. + isMP = false } } } @@ -841,7 +840,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok { tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr) if err != nil { - return putOpts, err + return putOpts, false, err } } putOpts.Internal.TaggingTimestamp = tagTimestamp @@ -865,7 +864,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok { rdate, err := amztime.ISO8601Parse(retainDateStr) if err != nil { - return putOpts, err + return putOpts, false, err } putOpts.RetainUntilDate = rdate // set retention timestamp in opts @@ -873,7 +872,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok { retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr) if err != nil { - return putOpts, err + return putOpts, false, err } } putOpts.Internal.RetentionTimestamp = retTimestamp @@ -885,7 +884,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok { lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr) if err != nil { - return putOpts, err + return putOpts, false, err } } putOpts.Internal.LegalholdTimestamp = lholdTimestamp @@ -897,7 +896,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part if crypto.S3KMS.IsEncrypted(objInfo.UserDefined) { sseEnc, err := encrypt.NewSSEKMS(objInfo.KMSKeyID(), nil) if err != nil { - return putOpts, err + return putOpts, false, err } putOpts.ServerSideEncryption = sseEnc } @@ -1290,7 +1289,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj // use core client to avoid doing multipart on PUT c := &minio.Core{Client: tgt.Client} - putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0) + putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo) if err != nil { replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err)) sendEvent(eventArgs{ @@ -1322,7 +1321,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj defer cancel() } r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts) - if objInfo.isMultipart() { + if isMP { rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts) } else { _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts) @@ -1577,8 +1576,7 @@ applyAction: replLogIf(ctx, fmt.Errorf("unable to replicate metadata for object %s/%s(%s) to target %s: %w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err)) } } else { - var putOpts minio.PutObjectOptions - putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0) + putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo) if err != nil { replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err)) sendEvent(eventArgs{ @@ -1609,7 +1607,7 @@ applyAction: defer cancel() } r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts) - if objInfo.isMultipart() { + if isMP { rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts) } else { _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts) @@ -1687,7 +1685,8 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob cHeader := http.Header{} cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true") if !isSSEC { - for k, v := range getCRCMeta(objInfo, partInfo.Number, nil) { + cs, _ := getCRCMeta(objInfo, partInfo.Number, nil) + for k, v := range cs { cHeader.Add(k, v) } } @@ -1732,8 +1731,9 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob defer ccancel() if len(objInfo.Checksum) > 0 { - for k, v := range getCRCMeta(objInfo, 0, nil) { - userMeta[k] = v + cs, _ := getCRCMeta(objInfo, 0, nil) + for k, v := range cs { + userMeta[k] = strings.Split(v, "-")[0] } } _, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{ @@ -3782,9 +3782,9 @@ type validateReplicationDestinationOptions struct { checkReadyErr sync.Map } -func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) map[string]string { +func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) (cs map[string]string, isMP bool) { meta := make(map[string]string) - cs := oi.decryptChecksums(partNum, h) + cs, isMP = oi.decryptChecksums(partNum, h) for k, v := range cs { cksum := hash.NewChecksumString(k, v) if cksum == nil { @@ -3793,7 +3793,10 @@ func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) map[string]string { if cksum.Valid() { meta[cksum.Type.Key()] = v } - meta[xhttp.AmzChecksumType] = cksum.Type.ObjType() + if isMP && partNum == 0 { + meta[xhttp.AmzChecksumType] = cksum.Type.ObjType() + meta[xhttp.AmzChecksumAlgo] = cksum.Type.String() + } } - return meta + return meta, isMP } diff --git a/cmd/encryption-v1.go b/cmd/encryption-v1.go index 62bdafcd3..5013c221f 100644 --- a/cmd/encryption-v1.go +++ b/cmd/encryption-v1.go @@ -1155,16 +1155,17 @@ func (o *ObjectInfo) metadataEncryptFn(headers http.Header) (objectMetaEncryptFn // decryptChecksums will attempt to decode checksums and return it/them if set. // if part > 0, and we have the checksum for the part that will be returned. -func (o *ObjectInfo) decryptChecksums(part int, h http.Header) map[string]string { +// Returns whether the checksum (main part 0) is a multipart checksum. +func (o *ObjectInfo) decryptChecksums(part int, h http.Header) (cs map[string]string, isMP bool) { data := o.Checksum if len(data) == 0 { - return nil + return nil, false } if part > 0 && !crypto.SSEC.IsEncrypted(o.UserDefined) { // already decrypted in ToObjectInfo for multipart objects for _, pi := range o.Parts { if pi.Number == part { - return pi.Checksums + return pi.Checksums, true } } } @@ -1174,7 +1175,7 @@ func (o *ObjectInfo) decryptChecksums(part int, h http.Header) map[string]string if err != crypto.ErrSecretKeyMismatch { encLogIf(GlobalContext, err) } - return nil + return nil, part > 0 } data = decrypted } diff --git a/cmd/ftp-server-driver.go b/cmd/ftp-server-driver.go index c2418f3eb..617c42230 100644 --- a/cmd/ftp-server-driver.go +++ b/cmd/ftp-server-driver.go @@ -552,6 +552,7 @@ func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reade info, err := clnt.PutObject(context.Background(), bucket, object, data, -1, minio.PutObjectOptions{ ContentType: mimedb.TypeByExtension(path.Ext(object)), DisableContentSha256: true, + Checksum: minio.ChecksumFullObjectCRC32C, }) n = info.Size return n, err diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index 7645b46a4..4e351ba72 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -374,7 +374,8 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, del bool, h htt lc.SetPredictionHeaders(w, objInfo.ToLifecycleOpts()) } } - hash.AddChecksumHeader(w, objInfo.decryptChecksums(0, h)) + cs, _ := objInfo.decryptChecksums(0, h) + hash.AddChecksumHeader(w, cs) } func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete, lcEvent lifecycle.Event) { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 31a140dc0..b17a042a8 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -521,7 +521,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil { // AWS S3 silently drops checksums on range requests. - hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header)) + cs, _ := objInfo.decryptChecksums(opts.PartNumber, r.Header) + hash.AddChecksumHeader(w, cs) } if err = setObjectHeaders(ctx, w, objInfo, rs, opts); err != nil { @@ -632,7 +633,7 @@ func (api objectAPIHandlers) getObjectAttributesHandler(ctx context.Context, obj w.Header().Del(xhttp.ContentType) if _, ok := opts.ObjectAttributes[xhttp.Checksum]; ok { - chkSums := objInfo.decryptChecksums(0, r.Header) + chkSums, _ := objInfo.decryptChecksums(0, r.Header) // AWS does not appear to append part number on this API call. if len(chkSums) > 0 { OA.Checksum = &objectAttributesChecksum{ @@ -945,7 +946,8 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil { // AWS S3 silently drops checksums on range requests. - hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header)) + cs, _ := objInfo.decryptChecksums(opts.PartNumber, r.Header) + hash.AddChecksumHeader(w, cs) } // Set standard object headers. diff --git a/cmd/sftp-server-driver.go b/cmd/sftp-server-driver.go index 33ddf6b66..31b0407ea 100644 --- a/cmd/sftp-server-driver.go +++ b/cmd/sftp-server-driver.go @@ -289,6 +289,7 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) { oi, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{ ContentType: mimedb.TypeByExtension(path.Ext(object)), DisableContentSha256: true, + Checksum: minio.ChecksumFullObjectCRC32C, }) stopFn(oi.Size, err) pr.CloseWithError(err) diff --git a/internal/hash/checksum.go b/internal/hash/checksum.go index 32772fc07..4e4c621c2 100644 --- a/internal/hash/checksum.go +++ b/internal/hash/checksum.go @@ -203,6 +203,24 @@ func (c ChecksumType) String() string { return "invalid" } +// StringFull returns the type and all flags as a string. +func (c ChecksumType) StringFull() string { + out := []string{c.String()} + if c.Is(ChecksumMultipart) { + out = append(out, "MULTIPART") + } + if c.Is(ChecksumIncludesMultipart) { + out = append(out, "INCLUDESMP") + } + if c.Is(ChecksumTrailing) { + out = append(out, "TRAILING") + } + if c.Is(ChecksumFullObject) { + out = append(out, "FULLOBJ") + } + return strings.Join(out, "|") +} + // FullObjectRequested will return if the checksum type indicates full object checksum was requested. func (c ChecksumType) FullObjectRequested() bool { return c&(ChecksumFullObject) == ChecksumFullObject || c.Is(ChecksumCRC64NVME) @@ -263,7 +281,8 @@ func NewChecksumFromData(t ChecksumType, data []byte) *Checksum { } // ReadCheckSums will read checksums from b and return them. -func ReadCheckSums(b []byte, part int) map[string]string { +// Returns whether this is (part of) a multipart checksum. +func ReadCheckSums(b []byte, part int) (cs map[string]string, isMP bool) { res := make(map[string]string, 1) for len(b) > 0 { t, n := binary.Uvarint(b) @@ -277,9 +296,11 @@ func ReadCheckSums(b []byte, part int) map[string]string { if length == 0 || len(b) < length { break } + cs := base64.StdEncoding.EncodeToString(b[:length]) b = b[length:] if typ.Is(ChecksumMultipart) { + isMP = true t, n := binary.Uvarint(b) if n < 0 { break @@ -317,7 +338,7 @@ func ReadCheckSums(b []byte, part int) map[string]string { if len(res) == 0 { res = nil } - return res + return res, isMP } // ReadPartCheckSums will read all part checksums from b and return them.