diff --git a/cmd/api-response.go b/cmd/api-response.go index 447a1e732..7f0641bb2 100644 --- a/cmd/api-response.go +++ b/cmd/api-response.go @@ -208,6 +208,13 @@ type CopyObjectResponse struct { ETag string // md5sum of the copied object. } +// CopyObjectPartResponse container returns ETag and LastModified of the successfully copied object +type CopyObjectPartResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyPartResult" json:"-"` + LastModified string // time string of format "2006-01-02T15:04:05.000Z" + ETag string // md5sum of the copied object part. +} + // Initiator inherit from Owner struct, fields are same type Initiator Owner @@ -399,6 +406,14 @@ func generateCopyObjectResponse(etag string, lastModified time.Time) CopyObjectR } } +// generates CopyObjectPartResponse from etag and lastModified time. +func generateCopyObjectPartResponse(etag string, lastModified time.Time) CopyObjectPartResponse { + return CopyObjectPartResponse{ + ETag: "\"" + etag + "\"", + LastModified: lastModified.UTC().Format(timeFormatAMZLong), + } +} + // generates InitiateMultipartUploadResponse for given bucket, key and uploadID. func generateInitiateMultipartUploadResponse(bucket, key, uploadID string) InitiateMultipartUploadResponse { return InitiateMultipartUploadResponse{ diff --git a/cmd/api-router.go b/cmd/api-router.go index d92aaaa06..b9c76f772 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -40,6 +40,8 @@ func registerAPIRouter(mux *router.Router) { // HeadObject bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(api.HeadObjectHandler) + // CopyObjectPart + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") // PutObjectPart bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(api.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") // ListObjectPxarts diff --git a/cmd/benchmark-utils_test.go b/cmd/benchmark-utils_test.go index 079fd0cfb..bcd231568 100644 --- a/cmd/benchmark-utils_test.go +++ b/cmd/benchmark-utils_test.go @@ -133,11 +133,12 @@ func runPutObjectPartBenchmark(b *testing.B, obj ObjectLayer, partSize int) { } metadata := make(map[string]string) metadata["md5Sum"] = getMD5Hash([]byte(textPartData)) - md5Sum, err = obj.PutObjectPart(bucket, object, uploadID, j, int64(len(textPartData)), bytes.NewBuffer(textPartData), metadata["md5Sum"], sha256sum) + var partInfo PartInfo + partInfo, err = obj.PutObjectPart(bucket, object, uploadID, j, int64(len(textPartData)), bytes.NewBuffer(textPartData), metadata["md5Sum"], sha256sum) if err != nil { b.Fatal(err) } - if md5Sum != metadata["md5Sum"] { + if partInfo.ETag != metadata["md5Sum"] { b.Fatalf("Write no: %d: Md5Sum mismatch during object write into the bucket: Expected %s, got %s", i+1, md5Sum, metadata["md5Sum"]) } } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index bc4d398fd..4ae57635c 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -451,17 +451,49 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, return fsMeta.Parts[nextPartIndex], true } +// CopyObjectPart - similar to PutObjectPart but reads data from an existing +// object. Internally incoming data is written to '.minio.sys/tmp' location +// and safely renamed to '.minio.sys/multipart' for reach parts. +func (fs fsObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64) (PartInfo, error) { + if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil { + return PartInfo{}, err + } + + // Initialize pipe. + pipeReader, pipeWriter := io.Pipe() + + go func() { + startOffset := int64(0) // Read the whole file. + if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { + errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) + pipeWriter.CloseWithError(gerr) + return + } + pipeWriter.Close() // Close writer explicitly signalling we wrote all data. + }() + + partInfo, err := fs.PutObjectPart(dstBucket, dstObject, uploadID, partID, length, pipeReader, "", "") + if err != nil { + return PartInfo{}, toObjectErr(err, dstBucket, dstObject) + } + + // Explicitly close the reader. + pipeReader.Close() + + return partInfo, nil +} + // PutObjectPart - reads incoming data until EOF for the part file on // an ongoing multipart transaction. Internally incoming data is // written to '.minio.sys/tmp' location and safely renamed to // '.minio.sys/multipart' for reach parts. -func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (string, error) { +func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) { if err := checkPutObjectPartArgs(bucket, object, fs); err != nil { - return "", err + return PartInfo{}, err } if _, err := fs.statBucketDir(bucket); err != nil { - return "", toObjectErr(err, bucket) + return PartInfo{}, toObjectErr(err, bucket) } // Hold the lock so that two parallel complete-multipart-uploads @@ -474,9 +506,9 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadsPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile) if _, err := fs.rwPool.Open(uploadsPath); err != nil { if err == errFileNotFound || err == errFileAccessDenied { - return "", traceError(InvalidUploadID{UploadID: uploadID}) + return PartInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) } - return "", toObjectErr(traceError(err), bucket, object) + return PartInfo{}, toObjectErr(traceError(err), bucket, object) } defer fs.rwPool.Close(uploadsPath) @@ -487,16 +519,16 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s rwlk, err := fs.rwPool.Write(fsMetaPath) if err != nil { if err == errFileNotFound || err == errFileAccessDenied { - return "", traceError(InvalidUploadID{UploadID: uploadID}) + return PartInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) } - return "", toObjectErr(traceError(err), bucket, object) + return PartInfo{}, toObjectErr(traceError(err), bucket, object) } defer rwlk.Close() fsMeta := fsMetaV1{} _, err = fsMeta.ReadFrom(rwlk) if err != nil { - return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath) + return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, fsMetaPath) } partSuffix := fmt.Sprintf("object%d", partID) @@ -534,14 +566,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s bytesWritten, cErr := fsCreateFile(fsPartPath, teeReader, buf, size) if cErr != nil { fsRemoveFile(fsPartPath) - return "", toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath) + return PartInfo{}, toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath) } // Should return IncompleteBody{} error when reader has fewer // bytes than specified in request header. if bytesWritten < size { fsRemoveFile(fsPartPath) - return "", traceError(IncompleteBody{}) + return PartInfo{}, traceError(IncompleteBody{}) } // Delete temporary part in case of failure. If @@ -552,14 +584,14 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { - return "", traceError(BadDigest{md5Hex, newMD5Hex}) + return PartInfo{}, traceError(BadDigest{md5Hex, newMD5Hex}) } } if sha256sum != "" { newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) if newSHA256sum != sha256sum { - return "", traceError(SHA256Mismatch{}) + return PartInfo{}, traceError(SHA256Mismatch{}) } } @@ -572,14 +604,20 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s fsNSPartPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, partPath) if err = fsRenameFile(fsPartPath, fsNSPartPath); err != nil { partLock.Unlock() - return "", toObjectErr(err, minioMetaMultipartBucket, partPath) + return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Save the object part info in `fs.json`. fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) if _, err = fsMeta.WriteTo(rwlk); err != nil { partLock.Unlock() - return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + } + + partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix) + fi, err := fsStatFile(partNamePath) + if err != nil { + return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partSuffix) } // Append the part in background. @@ -593,7 +631,12 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s partLock.Unlock() }() - return newMD5Hex, nil + return PartInfo{ + PartNumber: partID, + LastModified: fi.ModTime(), + ETag: newMD5Hex, + Size: fi.Size(), + }, nil } // listObjectParts - wrapper scanning through @@ -635,7 +678,7 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM if err != nil { return ListPartsInfo{}, toObjectErr(err, minioMetaMultipartBucket, partNamePath) } - result.Parts = append(result.Parts, partInfo{ + result.Parts = append(result.Parts, PartInfo{ PartNumber: part.Number, ETag: part.ETag, LastModified: fi.ModTime(), diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 0da114699..cdb052f29 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -145,7 +145,7 @@ type ListPartsInfo struct { IsTruncated bool // List of all parts. - Parts []partInfo + Parts []PartInfo EncodingType string // Not supported yet. } @@ -220,8 +220,8 @@ type ListObjectsInfo struct { Prefixes []string } -// partInfo - represents individual part metadata. -type partInfo struct { +// PartInfo - represents individual part metadata. +type PartInfo struct { // Part number that identifies the part. This is a positive integer between // 1 and 10,000. PartNumber int diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 6687d9594..4938af274 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -41,7 +41,8 @@ type ObjectLayer interface { // Multipart operations. ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) - PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (md5 string, err error) + CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) + PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (info PartInfo, err error) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) AbortMultipartUpload(bucket, object, uploadID string) error CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (objInfo ObjectInfo, err error) diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index 5e0146f3e..5feee0363 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -346,7 +346,7 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH // Validate all the test cases. for i, testCase := range testCases { - actualMd5Hex, actualErr := obj.PutObjectPart(testCase.bucketName, testCase.objName, testCase.uploadID, testCase.PartID, testCase.intputDataSize, bytes.NewBufferString(testCase.inputReaderData), testCase.inputMd5, testCase.inputSHA256) + actualInfo, actualErr := obj.PutObjectPart(testCase.bucketName, testCase.objName, testCase.uploadID, testCase.PartID, testCase.intputDataSize, bytes.NewBufferString(testCase.inputReaderData), testCase.inputMd5, testCase.inputSHA256) // All are test cases above are expected to fail. if actualErr != nil && testCase.shouldPass { t.Errorf("Test %d: %s: Expected to pass, but failed with: %s.", i+1, instanceType, actualErr.Error()) @@ -363,8 +363,8 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH // Test passes as expected, but the output values are verified for correctness here. if actualErr == nil && testCase.shouldPass { // Asserting whether the md5 output is correct. - if testCase.inputMd5 != actualMd5Hex { - t.Errorf("Test %d: %s: Calculated Md5 different from the actual one %s.", i+1, instanceType, actualMd5Hex) + if testCase.inputMd5 != actualInfo.ETag { + t.Errorf("Test %d: %s: Calculated Md5 different from the actual one %s.", i+1, instanceType, actualInfo.ETag) } } } @@ -1344,7 +1344,7 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks Object: objectNames[0], MaxParts: 10, UploadID: uploadIDs[0], - Parts: []partInfo{ + Parts: []PartInfo{ { PartNumber: 1, Size: 4, @@ -1375,7 +1375,7 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks NextPartNumberMarker: 3, IsTruncated: true, UploadID: uploadIDs[0], - Parts: []partInfo{ + Parts: []PartInfo{ { PartNumber: 1, Size: 4, @@ -1400,7 +1400,7 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks MaxParts: 2, IsTruncated: false, UploadID: uploadIDs[0], - Parts: []partInfo{ + Parts: []PartInfo{ { PartNumber: 4, Size: 4, @@ -1581,7 +1581,7 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) Object: objectNames[0], MaxParts: 10, UploadID: uploadIDs[0], - Parts: []partInfo{ + Parts: []PartInfo{ { PartNumber: 1, Size: 4, @@ -1612,7 +1612,7 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) NextPartNumberMarker: 3, IsTruncated: true, UploadID: uploadIDs[0], - Parts: []partInfo{ + Parts: []PartInfo{ { PartNumber: 1, Size: 4, @@ -1637,7 +1637,7 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) MaxParts: 2, IsTruncated: false, UploadID: uploadIDs[0], - Parts: []partInfo{ + Parts: []PartInfo{ { PartNumber: 4, Size: 4, diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index 46fbe1ec8..dee33a0d5 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -22,6 +22,16 @@ import ( "time" ) +// Validates the preconditions for CopyObjectPart, returns true if CopyObjectPart +// operation should not proceed. Preconditions supported are: +// x-amz-copy-source-if-modified-since +// x-amz-copy-source-if-unmodified-since +// x-amz-copy-source-if-match +// x-amz-copy-source-if-none-match +func checkCopyObjectPartPreconditions(w http.ResponseWriter, r *http.Request, objInfo ObjectInfo) bool { + return checkCopyObjectPreconditions(w, r, objInfo) +} + // Validates the preconditions for CopyObject, returns true if CopyObject operation should not proceed. // Preconditions supported are: // x-amz-copy-source-if-modified-since diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index d7c2bec79..a3ef52f9f 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -531,7 +531,117 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeSuccessResponseXML(w, encodedSuccessResponse) } -// PutObjectPartHandler - Upload part +// CopyObjectPartHandler - uploads a part by copying data from an existing object as data source. +func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + dstBucket := vars["bucket"] + dstObject := vars["object"] + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(w, ErrServerNotInitialized, r.URL) + return + } + + if s3Error := checkRequestAuthType(r, dstBucket, "s3:PutObject", serverConfig.GetRegion()); s3Error != ErrNone { + writeErrorResponse(w, s3Error, r.URL) + return + } + + // Copy source path. + cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) + if err != nil { + // Save unescaped string as is. + cpSrcPath = r.Header.Get("X-Amz-Copy-Source") + } + + srcBucket, srcObject := path2BucketAndObject(cpSrcPath) + // If source object is empty or bucket is empty, reply back invalid copy source. + if srcObject == "" || srcBucket == "" { + writeErrorResponse(w, ErrInvalidCopySource, r.URL) + return + } + + uploadID := r.URL.Query().Get("uploadId") + partIDString := r.URL.Query().Get("partNumber") + + partID, err := strconv.Atoi(partIDString) + if err != nil { + writeErrorResponse(w, ErrInvalidPart, r.URL) + return + } + + // check partID with maximum part ID for multipart objects + if isMaxPartID(partID) { + writeErrorResponse(w, ErrInvalidMaxParts, r.URL) + return + } + + // Hold read locks on source object only if we are + // going to read data from source object. + objectSRLock := globalNSMutex.NewNSLock(srcBucket, srcObject) + objectSRLock.RLock() + defer objectSRLock.RUnlock() + + objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject) + if err != nil { + errorIf(err, "Unable to fetch object info.") + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + // Get request range. + var hrange *httpRange + rangeHeader := r.Header.Get("x-amz-copy-source-range") + if rangeHeader != "" { + if hrange, err = parseRequestRange(rangeHeader, objInfo.Size); err != nil { + // Handle only errInvalidRange + // Ignore other parse error and treat it as regular Get request like Amazon S3. + if err == errInvalidRange { + writeErrorResponse(w, ErrInvalidRange, r.URL) + return + } + + // log the error. + errorIf(err, "Invalid request range") + } + } + + // Verify before x-amz-copy-source preconditions before continuing with CopyObject. + if checkCopyObjectPartPreconditions(w, r, objInfo) { + return + } + + // Get the object. + startOffset := int64(0) + length := objInfo.Size + if hrange != nil { + startOffset = hrange.offsetBegin + length = hrange.getLength() + } + + /// maximum copy size for multipart objects in a single operation + if isMaxObjectSize(length) { + writeErrorResponse(w, ErrEntityTooLarge, r.URL) + return + } + + // Copy source object to destination, if source and destination + // object is same then only metadata is updated. + partInfo, err := objectAPI.CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + + response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) +} + +// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucket := vars["bucket"] @@ -590,7 +700,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } - var partMD5 string + var partInfo PartInfo incomingMD5 := hex.EncodeToString(md5Bytes) sha256sum := "" switch rAuthType { @@ -606,7 +716,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } // No need to verify signature, anonymous request access is already allowed. - partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5, sha256sum) + partInfo, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5, sha256sum) case authTypeStreamingSigned: // Initialize stream signature verifier. reader, s3Error := newSignV4ChunkedReader(r) @@ -615,7 +725,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http writeErrorResponse(w, s3Error, r.URL) return } - partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5, sha256sum) + partInfo, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, incomingMD5, sha256sum) case authTypeSignedV2, authTypePresignedV2: s3Error := isReqAuthenticatedV2(r) if s3Error != ErrNone { @@ -623,7 +733,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http writeErrorResponse(w, s3Error, r.URL) return } - partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5, sha256sum) + partInfo, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5, sha256sum) case authTypePresigned, authTypeSigned: if s3Error := reqSignatureV4Verify(r); s3Error != ErrNone { errorIf(errSignatureMismatch, dumpRequest(r)) @@ -634,7 +744,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http if !skipContentSha256Cksum(r) { sha256sum = r.Header.Get("X-Amz-Content-Sha256") } - partMD5, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5, sha256sum) + partInfo, err = objectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, incomingMD5, sha256sum) } if err != nil { errorIf(err, "Unable to create object part.") @@ -642,8 +752,8 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } - if partMD5 != "" { - w.Header().Set("ETag", "\""+partMD5+"\"") + if partInfo.ETag != "" { + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") } writeSuccessResponseHeadersOnly(w) diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index ed454fd87..5d0359d0c 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -918,6 +918,324 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a } +// Wrapper for calling Copy Object Part API handler tests for both XL multiple disks and single node setup. +func TestAPICopyObjectPartHandler(t *testing.T) { + defer DetectTestLeak(t)() + ExecObjectLayerAPITest(t, testAPICopyObjectPartHandler, []string{"CopyObjectPart"}) +} + +func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler, + credentials credential, t *testing.T) { + + objectName := "test-object" + // register event notifier. + err := initEventNotifier(obj) + if err != nil { + t.Fatalf("Initializing event notifiers failed") + } + + // set of byte data for PutObject. + // object has to be created before running tests for Copy Object. + // this is required even to assert the copied object, + bytesData := []struct { + byteData []byte + }{ + {generateBytesData(6 * humanize.KiByte)}, + } + + // set of inputs for uploading the objects before tests for downloading is done. + putObjectInputs := []struct { + bucketName string + objectName string + contentLength int64 + textData []byte + metaData map[string]string + }{ + // case - 1. + {bucketName, objectName, int64(len(bytesData[0].byteData)), bytesData[0].byteData, make(map[string]string)}, + } + sha256sum := "" + // iterate through the above set of inputs and upload the object. + for i, input := range putObjectInputs { + // uploading the object. + _, err = obj.PutObject(input.bucketName, input.objectName, input.contentLength, bytes.NewBuffer(input.textData), input.metaData, sha256sum) + // if object upload fails stop the test. + if err != nil { + t.Fatalf("Put Object case %d: Error uploading object: %v", i+1, err) + } + } + + // Initiate Multipart upload for testing PutObjectPartHandler. + testObject := "testobject" + + // PutObjectPart API HTTP Handler has to be tested in isolation, + // that is without any other handler being registered, + // That's why NewMultipartUpload is initiated using ObjectLayer. + uploadID, err := obj.NewMultipartUpload(bucketName, testObject, nil) + if err != nil { + // Failed to create NewMultipartUpload, abort. + t.Fatalf("Minio %s : %s", instanceType, err) + } + + // test cases with inputs and expected result for Copy Object. + testCases := []struct { + bucketName string + copySourceHeader string // data for "X-Amz-Copy-Source" header. Contains the object to be copied in the URL. + copySourceRange string // data for "X-Amz-Copy-Source-Range" header, contains the byte range offsets of data to be copied. + uploadID string // uploadID of the transaction. + invalidPartNumber bool // Sets an invalid multipart. + maximumPartNumber bool // Sets a maximum parts. + accessKey string + secretKey string + // expected output. + expectedRespStatus int + }{ + // Test case - 1, copy part 1 from from newObject1, ignore request headers. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + expectedRespStatus: http.StatusOK, + }, + + // Test case - 2. + // Test case with invalid source object. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/"), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusBadRequest, + }, + + // Test case - 3. + // Test case with new object name is same as object to be copied. + // Fail with file not found. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + testObject), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusNotFound, + }, + + // Test case - 4. + // Test case with valid byte range. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + copySourceRange: "bytes=500-4096", + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusOK, + }, + + // Test case - 5. + // Test case with invalid byte range. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + copySourceRange: "bytes=6145-", + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusRequestedRangeNotSatisfiable, + }, + + // Test case - 6. + // Test case with object name missing from source. + // fail with BadRequest. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("//123"), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusBadRequest, + }, + + // Test case - 7. + // Test case with non-existent source file. + // Case for the purpose of failing `api.ObjectAPI.GetObjectInfo`. + // Expecting the response status code to http.StatusNotFound (404). + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + "non-existent-object"), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusNotFound, + }, + + // Test case - 8. + // Test case with non-existent source file. + // Case for the purpose of failing `api.ObjectAPI.PutObjectPart`. + // Expecting the response status code to http.StatusNotFound (404). + { + bucketName: "non-existent-destination-bucket", + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusNotFound, + }, + + // Test case - 9. + // Case with invalid AccessKey. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + accessKey: "Invalid-AccessID", + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusForbidden, + }, + + // Test case - 10. + // Case with non-existent upload id. + { + bucketName: bucketName, + uploadID: "-1", + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusNotFound, + }, + // Test case - 11. + // invalid part number. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + invalidPartNumber: true, + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + expectedRespStatus: http.StatusOK, + }, + // Test case - 12. + // maximum part number. + { + bucketName: bucketName, + uploadID: uploadID, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + maximumPartNumber: true, + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + expectedRespStatus: http.StatusOK, + }, + } + + for i, testCase := range testCases { + var req *http.Request + var reqV2 *http.Request + // initialize HTTP NewRecorder, this records any mutations to response writer inside the handler. + rec := httptest.NewRecorder() + if !testCase.invalidPartNumber || !testCase.maximumPartNumber { + // construct HTTP request for copy object. + req, err = newTestSignedRequestV4("PUT", getCopyObjectPartURL("", testCase.bucketName, testObject, testCase.uploadID, "1"), 0, nil, testCase.accessKey, testCase.secretKey) + } else if testCase.invalidPartNumber { + req, err = newTestSignedRequestV4("PUT", getCopyObjectPartURL("", testCase.bucketName, testObject, testCase.uploadID, "abc"), 0, nil, testCase.accessKey, testCase.secretKey) + } else if testCase.maximumPartNumber { + req, err = newTestSignedRequestV4("PUT", getCopyObjectPartURL("", testCase.bucketName, testObject, testCase.uploadID, "99999"), 0, nil, testCase.accessKey, testCase.secretKey) + } + if err != nil { + t.Fatalf("Test %d: Failed to create HTTP request for copy Object: %v", i+1, err) + } + + // "X-Amz-Copy-Source" header contains the information about the source bucket and the object to copied. + if testCase.copySourceHeader != "" { + req.Header.Set("X-Amz-Copy-Source", testCase.copySourceHeader) + } + if testCase.copySourceRange != "" { + req.Header.Set("X-Amz-Copy-Source-Range", testCase.copySourceRange) + } + // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. + // Call the ServeHTTP to execute the handler, `func (api objectAPIHandlers) CopyObjectHandler` handles the request. + apiRouter.ServeHTTP(rec, req) + // Assert the response code with the expected status. + if rec.Code != testCase.expectedRespStatus { + t.Fatalf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code) + } + if rec.Code == http.StatusOK { + // See if the new part has been uploaded. + // testing whether the copy was successful. + var results ListPartsInfo + results, err = obj.ListObjectParts(testCase.bucketName, testObject, testCase.uploadID, 0, 1) + if err != nil { + t.Fatalf("Test %d: %s: Failed to look for copied object part: %s", i+1, instanceType, err) + } + if len(results.Parts) != 1 { + t.Fatalf("Test %d: %s: Expected only one entry returned %d entries", i+1, instanceType, len(results.Parts)) + } + } + + // Verify response of the V2 signed HTTP request. + // initialize HTTP NewRecorder, this records any mutations to response writer inside the handler. + recV2 := httptest.NewRecorder() + + reqV2, err = newTestRequest("PUT", getCopyObjectPartURL("", testCase.bucketName, testObject, testCase.uploadID, "1"), 0, nil) + if err != nil { + t.Fatalf("Test %d: Failed to create HTTP request for copy Object: %v", i+1, err) + } + // "X-Amz-Copy-Source" header contains the information about the source bucket and the object to copied. + if testCase.copySourceHeader != "" { + reqV2.Header.Set("X-Amz-Copy-Source", testCase.copySourceHeader) + } + if testCase.copySourceRange != "" { + reqV2.Header.Set("X-Amz-Copy-Source-Range", testCase.copySourceRange) + } + + err = signRequestV2(reqV2, testCase.accessKey, testCase.secretKey) + if err != nil { + t.Fatalf("Failed to V2 Sign the HTTP request: %v.", err) + } + + // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. + // Call the ServeHTTP to execute the handler. + apiRouter.ServeHTTP(recV2, reqV2) + if recV2.Code != testCase.expectedRespStatus { + t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, recV2.Code) + } + } + + // HTTP request for testing when `ObjectLayer` is set to `nil`. + // There is no need to use an existing bucket and valid input for creating the request + // since the `objectLayer==nil` check is performed before any other checks inside the handlers. + // The only aim is to generate an HTTP request in a way that the relevant/registered end point is evoked/called. + nilBucket := "dummy-bucket" + nilObject := "dummy-object" + + nilReq, err := newTestSignedRequestV4("PUT", getCopyObjectPartURL("", nilBucket, nilObject, "0", "0"), + 0, bytes.NewReader([]byte("testNilObjLayer")), "", "") + if err != nil { + t.Errorf("Minio %s: Failed to create http request for testing the response when object Layer is set to `nil`.", instanceType) + } + + // Below is how CopyObjectPartHandler is registered. + // bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") + // Its necessary to set the "X-Amz-Copy-Source" header for the request to be accepted by the handler. + nilReq.Header.Set("X-Amz-Copy-Source", url.QueryEscape("/"+nilBucket+"/"+nilObject)) + + // execute the object layer set to `nil` test. + // `ExecObjectLayerAPINilTest` manages the operation. + ExecObjectLayerAPINilTest(t, nilBucket, nilObject, instanceType, apiRouter, nilReq) + +} + // Wrapper for calling Copy Object API handler tests for both XL multiple disks and single node setup. func TestAPICopyObjectHandler(t *testing.T) { defer DetectTestLeak(t)() diff --git a/cmd/object_api_suite_test.go b/cmd/object_api_suite_test.go index 88ad87964..e3cb467ad 100644 --- a/cmd/object_api_suite_test.go +++ b/cmd/object_api_suite_test.go @@ -106,15 +106,18 @@ func testMultipartObjectCreation(obj ObjectLayer, instanceType string, c TestErr for i := 1; i <= 10; i++ { expectedMD5Sumhex := getMD5Hash(data) - var calculatedMD5sum string - calculatedMD5sum, err = obj.PutObjectPart("bucket", "key", uploadID, i, int64(len(data)), bytes.NewBuffer(data), expectedMD5Sumhex, "") + var calcPartInfo PartInfo + calcPartInfo, err = obj.PutObjectPart("bucket", "key", uploadID, i, int64(len(data)), bytes.NewBuffer(data), expectedMD5Sumhex, "") if err != nil { c.Errorf("%s: %s", instanceType, err) } - if calculatedMD5sum != expectedMD5Sumhex { + if calcPartInfo.ETag != expectedMD5Sumhex { c.Errorf("MD5 Mismatch") } - completedParts.Parts = append(completedParts.Parts, completePart{PartNumber: i, ETag: calculatedMD5sum}) + completedParts.Parts = append(completedParts.Parts, completePart{ + PartNumber: i, + ETag: calcPartInfo.ETag, + }) } objInfo, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) if err != nil { @@ -153,12 +156,12 @@ func testMultipartObjectAbort(obj ObjectLayer, instanceType string, c TestErrHan expectedMD5Sumhex := getMD5Hash([]byte(randomString)) metadata["md5"] = expectedMD5Sumhex - var calculatedMD5sum string - calculatedMD5sum, err = obj.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex, "") + var calcPartInfo PartInfo + calcPartInfo, err = obj.PutObjectPart("bucket", "key", uploadID, i, int64(len(randomString)), bytes.NewBufferString(randomString), expectedMD5Sumhex, "") if err != nil { c.Fatalf("%s: %s", instanceType, err) } - if calculatedMD5sum != expectedMD5Sumhex { + if calcPartInfo.ETag != expectedMD5Sumhex { c.Errorf("Md5 Mismatch") } parts[i] = expectedMD5Sumhex diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index a9c544576..98c6bc15e 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -1419,6 +1419,13 @@ func getPutObjectPartURL(endPoint, bucketName, objectName, uploadID, partNumber return makeTestTargetURL(endPoint, bucketName, objectName, queryValues) } +func getCopyObjectPartURL(endPoint, bucketName, objectName, uploadID, partNumber string) string { + queryValues := url.Values{} + queryValues.Set("uploadId", uploadID) + queryValues.Set("partNumber", partNumber) + return makeTestTargetURL(endPoint, bucketName, objectName, queryValues) +} + // return URL for fetching object from the bucket. func getGetObjectURL(endPoint, bucketName, objectName string) string { return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{}) @@ -1883,22 +1890,22 @@ func ExecObjectLayerAPIAnonTest(t *testing.T, testName, bucketName, objectName, } } -// ExecObjectLayerAPINilTest - Sets the object layer to `nil`, and calls rhe registered object layer API endpoint, and assert the error response. -// The purpose is to validate the API handlers response when the object layer is uninitialized. -// Usage hint: Should be used at the end of the API end points tests (ex: check the last few lines of `testAPIListObjectPartsHandler`), need a sample HTTP request -// to be sent as argument so that the relevant handler is called, -// the handler registration is expected to be done since its called from within the API handler tests, -// the reference to the registered HTTP handler has to be sent as an argument. +// ExecObjectLayerAPINilTest - Sets the object layer to `nil`, and calls rhe registered object layer API endpoint, +// and assert the error response. The purpose is to validate the API handlers response when the object layer is uninitialized. +// Usage hint: Should be used at the end of the API end points tests (ex: check the last few lines of `testAPIListObjectPartsHandler`), +// need a sample HTTP request to be sent as argument so that the relevant handler is called, the handler registration is expected +// to be done since its called from within the API handler tests, the reference to the registered HTTP handler has to be sent +// as an argument. func ExecObjectLayerAPINilTest(t TestErrHandler, bucketName, objectName, instanceType string, apiRouter http.Handler, req *http.Request) { // httptest Recorder to capture all the response by the http handler. rec := httptest.NewRecorder() // The API handler gets the referece to the object layer via the global object Layer, // setting it to `nil` in order test for handlers response for uninitialized object layer. - globalObjLayerMutex.Lock() globalObjectAPI = nil globalObjLayerMutex.Unlock() + // call the HTTP handler. apiRouter.ServeHTTP(rec, req) @@ -1909,7 +1916,8 @@ func ExecObjectLayerAPINilTest(t TestErrHandler, bucketName, objectName, instanc t.Errorf("Object API Nil Test expected to fail with %d, but failed with %d", serverNotInitializedErr, rec.Code) } // expected error response in bytes when objectLayer is not initialized, or set to `nil`. - expectedErrResponse := encodeResponse(getAPIErrorResponse(getAPIError(ErrServerNotInitialized), getGetObjectURL("", bucketName, objectName))) + expectedErrResponse := encodeResponse(getAPIErrorResponse(getAPIError(ErrServerNotInitialized), + getGetObjectURL("", bucketName, objectName))) // HEAD HTTP Request doesn't contain body in its response, // for other type of HTTP requests compare the response body content with the expected one. @@ -2093,6 +2101,9 @@ func registerBucketLevelFunc(bucket *router.Router, api objectAPIHandlers, apiFu case "NewMultipart": // Register New Multipart upload handler. bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(api.NewMultipartUploadHandler).Queries("uploads", "") + case "CopyObjectPart": + // Register CopyObjectPart handler. + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(api.CopyObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") case "PutObjectPart": // Register PutObjectPart handler. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(api.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 10fd79f1f..cbaa58f7f 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -499,7 +499,8 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st uploadIDPath := path.Join(bucket, object, uploadID) tempUploadIDPath := uploadID // Write updated `xl.json` to all disks. - if err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil { + err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum) + if err != nil { return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) } // delete the tmp path later in case we fail to rename (ignore @@ -538,14 +539,49 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st return xl.newMultipartUpload(bucket, object, meta) } +// CopyObjectPart - reads incoming stream and internally erasure codes +// them. This call is similar to put object part operation but the source +// data is read from an existing object. +// +// Implements S3 compatible Upload Part Copy API. +func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64) (PartInfo, error) { + if err := checkNewMultipartArgs(srcBucket, srcObject, xl); err != nil { + return PartInfo{}, err + } + + // Initialize pipe. + pipeReader, pipeWriter := io.Pipe() + + go func() { + startOffset := int64(0) // Read the whole file. + if gerr := xl.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { + errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) + pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) + return + } + pipeWriter.Close() // Close writer explicitly signalling we wrote all data. + }() + + partInfo, err := xl.PutObjectPart(dstBucket, dstObject, uploadID, partID, length, pipeReader, "", "") + if err != nil { + return PartInfo{}, toObjectErr(err, dstBucket, dstObject) + } + + // Explicitly close the reader. + pipeReader.Close() + + // Success. + return partInfo, nil +} + // PutObjectPart - reads incoming stream and internally erasure codes // them. This call is similar to single put operation but it is part // of the multipart transaction. // // Implements S3 compatible Upload Part API. -func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (string, error) { +func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) { if err := checkPutObjectPartArgs(bucket, object, xl); err != nil { - return "", err + return PartInfo{}, err } var partsMetadata []xlMetaV1 @@ -558,14 +594,15 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { preUploadIDLock.RUnlock() - return "", traceError(InvalidUploadID{UploadID: uploadID}) + return PartInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) } + // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { preUploadIDLock.RUnlock() - return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object) + return PartInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object) } preUploadIDLock.RUnlock() @@ -575,7 +612,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Pick one from the first valid metadata. xlMeta, err := pickValidXLMeta(partsMetadata, modTime) if err != nil { - return "", err + return PartInfo{}, err } onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks) @@ -633,13 +670,13 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Erasure code data and write across all disks. sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, allowEmpty, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) if err != nil { - return "", toObjectErr(err, bucket, object) + return PartInfo{}, toObjectErr(err, bucket, object) } // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. if sizeWritten < size { - return "", traceError(IncompleteBody{}) + return PartInfo{}, traceError(IncompleteBody{}) } // For size == -1, perhaps client is sending in chunked encoding @@ -653,14 +690,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if md5Hex != "" { if newMD5Hex != md5Hex { // Returns md5 mismatch. - return "", traceError(BadDigest{md5Hex, newMD5Hex}) + return PartInfo{}, traceError(BadDigest{md5Hex, newMD5Hex}) } } if sha256sum != "" { newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) if newSHA256sum != sha256sum { - return "", traceError(SHA256Mismatch{}) + return PartInfo{}, traceError(SHA256Mismatch{}) } } @@ -671,20 +708,20 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Validate again if upload ID still exists. if !xl.isUploadIDExists(bucket, object, uploadID) { - return "", traceError(InvalidUploadID{UploadID: uploadID}) + return PartInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) } // Rename temporary part file to its final location. partPath := path.Join(uploadIDPath, partSuffix) err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum) if err != nil { - return "", toObjectErr(err, minioMetaMultipartBucket, partPath) + return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Read metadata again because it might be updated with parallel upload of another part. partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaMultipartBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { - return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object) + return PartInfo{}, toObjectErr(traceError(errXLWriteQuorum), bucket, object) } // Get current highest version based on re-read partsMetadata. @@ -693,7 +730,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Pick one from the first valid metadata. xlMeta, err = pickValidXLMeta(partsMetadata, modTime) if err != nil { - return "", err + return PartInfo{}, err } // Once part is successfully committed, proceed with updating XL metadata. @@ -720,15 +757,25 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Writes a unique `xl.json` each disk carrying new checksum related information. if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil { - return "", toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath) + return PartInfo{}, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath) } rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) if rErr != nil { - return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) + return PartInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) + } + + fi, err := xl.statPart(bucket, object, uploadID, partSuffix) + if err != nil { + return PartInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, partSuffix) } // Return success. - return newMD5Hex, nil + return PartInfo{ + PartNumber: partID, + LastModified: fi.ModTime, + ETag: newMD5Hex, + Size: fi.Size, + }, nil } // listObjectParts - wrapper reading `xl.json` for a given object and @@ -772,7 +819,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM if err != nil { return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, path.Join(uploadID, part.Name)) } - result.Parts = append(result.Parts, partInfo{ + result.Parts = append(result.Parts, PartInfo{ PartNumber: part.Number, ETag: part.ETag, LastModified: fi.ModTime, diff --git a/docs/minio-limitations.md b/docs/minio-limitations.md index 539e6ef9f..d7f49e660 100644 --- a/docs/minio-limitations.md +++ b/docs/minio-limitations.md @@ -47,4 +47,3 @@ We found the following APIs to be redundant or less useful outside of AWS. If yo - ObjectACL (Use bucket policies instead) - ObjectTorrent -- ObjectCopyPart