change server side uploader to fully streaming based (#1066)
this PR fixes the behavior to avoid `/tmp` folder as staging directory for large uploads, instead rely on the client upload stream itself to upload the object entirely.
This commit is contained in:
@@ -21,11 +21,11 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -468,16 +468,35 @@ func uploadFiles(ctx context.Context, client MinioClient, params user_api.PostBu
|
||||
prefix = *params.Prefix
|
||||
}
|
||||
|
||||
// get object files from request
|
||||
o, err := getFormFiles(params.HTTPRequest)
|
||||
// parse a request body as multipart/form-data.
|
||||
// 32 << 20 is default max memory
|
||||
mr, err := params.HTTPRequest.MultipartReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer o.Close()
|
||||
|
||||
// upload files one by one
|
||||
for _, obj := range o.Files() {
|
||||
if err := uploadObject(ctx, client, params.BucketName, path.Join(prefix, obj.name), obj.size, obj.file); err != nil {
|
||||
for {
|
||||
p, err := mr.NextPart()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
size, err := strconv.ParseInt(p.FormName(), 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
contentType := p.Header.Get("content-type")
|
||||
if contentType == "" {
|
||||
contentType = mimedb.TypeByExtension(filepath.Ext(p.FileName()))
|
||||
}
|
||||
|
||||
_, err = client.putObject(ctx, params.BucketName, path.Join(prefix, p.FileName()), p, size, minio.PutObjectOptions{
|
||||
ContentType: contentType,
|
||||
DisableMultipart: true, // Do not upload as multipart stream for console uploader.
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -485,71 +504,6 @@ func uploadFiles(ctx context.Context, client MinioClient, params user_api.PostBu
|
||||
return nil
|
||||
}
|
||||
|
||||
type objectFile struct {
|
||||
name string
|
||||
size int64
|
||||
file multipart.File
|
||||
}
|
||||
|
||||
type objectFiles struct {
|
||||
files []objectFile
|
||||
form *multipart.Form
|
||||
}
|
||||
|
||||
func (o objectFiles) Files() []objectFile {
|
||||
return o.files
|
||||
}
|
||||
|
||||
func (o objectFiles) Close() error {
|
||||
for _, fl := range o.files {
|
||||
fl.file.Close()
|
||||
}
|
||||
if o.form != nil {
|
||||
return o.form.RemoveAll()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getFormFiles parses the request body and gets all the files from the Request
|
||||
// it includes name, size and file content
|
||||
func getFormFiles(r *http.Request) (o objectFiles, err error) {
|
||||
if r == nil {
|
||||
return o, errors.New("http.Request is nil")
|
||||
}
|
||||
|
||||
// parse a request body as multipart/form-data.
|
||||
// 32 << 20 is default max memory
|
||||
if err := r.ParseMultipartForm(32 << 20); err != nil {
|
||||
return o, err
|
||||
}
|
||||
|
||||
for fileName, files := range r.MultipartForm.File {
|
||||
if len(files) > 0 {
|
||||
f, err := files[0].Open()
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
o.files = append(o.files, objectFile{
|
||||
name: fileName,
|
||||
size: files[0].Size,
|
||||
file: f,
|
||||
})
|
||||
}
|
||||
}
|
||||
o.form = r.MultipartForm
|
||||
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func uploadObject(ctx context.Context, client MinioClient, bucket, object string, size int64, reader io.ReadCloser) error {
|
||||
contentType := mimedb.TypeByExtension(filepath.Ext(object))
|
||||
_, err := client.putObject(ctx, bucket, object, reader, size, minio.PutObjectOptions{
|
||||
ContentType: contentType,
|
||||
DisableMultipart: true, // Do not upload as multipart stream for console uploader.
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// getShareObjectResponse returns a share object url
|
||||
func getShareObjectResponse(session *models.Principal, params user_api.ShareObjectParams) (*string, *models.Error) {
|
||||
ctx := context.Background()
|
||||
|
||||
Reference in New Issue
Block a user