feat: implement posix UploadCopyPart

This commit is contained in:
Ben McClelland
2023-06-30 21:09:44 -07:00
parent db484eb900
commit 47dea2db7c
4 changed files with 139 additions and 107 deletions

View File

@@ -39,11 +39,10 @@ type Backend interface {
CreateMultipartUpload(*s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error)
CompleteMultipartUpload(bucket, object, uploadID string, parts []types.Part) (*s3.CompleteMultipartUploadOutput, error)
AbortMultipartUpload(*s3.AbortMultipartUploadInput) error
ListMultipartUploads(output *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResponse, error)
ListMultipartUploads(*s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResponse, error)
ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (s3response.ListPartsResponse, error)
CopyPart(srcBucket, srcObject, DstBucket, uploadID, rangeHeader string, part int) (*types.CopyPartResult, error)
PutObjectPart(bucket, object, uploadID string, part int, length int64, r io.Reader) (etag string, err error)
UploadPartCopy(*s3.UploadPartCopyInput) (*s3.UploadPartCopyOutput, error)
UploadPartCopy(*s3.UploadPartCopyInput) (s3response.CopyObjectResult, error)
PutObject(*s3.PutObjectInput) (string, error)
HeadObject(bucket, object string) (*s3.HeadObjectOutput, error)
@@ -86,8 +85,8 @@ func (BackendUnsupported) PutObjectAcl(*s3.PutObjectAclInput) error {
func (BackendUnsupported) RestoreObject(bucket, object string, restoreRequest *s3.RestoreObjectInput) error {
return s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) UploadPartCopy(*s3.UploadPartCopyInput) (*s3.UploadPartCopyOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) UploadPartCopy(*s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) GetBucketAcl(bucket string) ([]byte, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
@@ -117,9 +116,6 @@ func (BackendUnsupported) ListMultipartUploads(output *s3.ListMultipartUploadsIn
func (BackendUnsupported) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (s3response.ListPartsResponse, error) {
return s3response.ListPartsResponse{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) CopyPart(srcBucket, srcObject, DstBucket, uploadID, rangeHeader string, part int) (*types.CopyPartResult, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) PutObjectPart(bucket, object, uploadID string, part int, length int64, r io.Reader) (etag string, err error) {
return "", s3err.GetAPIError(s3err.ErrNotImplemented)
}

View File

@@ -693,10 +693,6 @@ func (p *Posix) ListObjectParts(bucket, object, uploadID string, partNumberMarke
}, nil
}
// TODO: copy part
// func (p *Posix) CopyPart(srcBucket, srcObject, DstBucket, uploadID, rangeHeader string, part int) (*types.CopyPartResult, error) {
// }
func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length int64, r io.Reader) (string, error) {
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
@@ -708,6 +704,15 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
_, err = os.Stat(filepath.Join(bucket, objdir, uploadID))
if errors.Is(err, fs.ErrNotExist) {
return "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if err != nil {
return "", fmt.Errorf("stat uploadid: %w", err)
}
partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part))
f, err := openTmpFile(filepath.Join(bucket, objdir),
@@ -736,6 +741,111 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length
return etag, nil
}
func (p *Posix) UploadPartCopy(upi *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
_, err := os.Stat(*upi.Bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("stat bucket: %w", err)
}
sum := sha256.Sum256([]byte(*upi.Key))
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
_, err = os.Stat(filepath.Join(*upi.Bucket, objdir, *upi.UploadId))
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("stat uploadid: %w", err)
}
partPath := filepath.Join(objdir, *upi.UploadId, fmt.Sprintf("%v", upi.PartNumber))
substrs := strings.SplitN(*upi.CopySource, "/", 2)
if len(substrs) != 2 {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrInvalidCopySource)
}
srcBucket := substrs[0]
srcObject := substrs[1]
_, err = os.Stat(srcBucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("stat bucket: %w", err)
}
objPath := filepath.Join(srcBucket, srcObject)
fi, err := os.Stat(objPath)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("stat object: %w", err)
}
startOffset, length, err := backend.ParseRange(fi, *upi.CopySourceRange)
if err != nil {
return s3response.CopyObjectResult{}, err
}
if length == -1 {
length = fi.Size() - startOffset + 1
}
if startOffset+length > fi.Size()+1 {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrInvalidRequest)
}
f, err := openTmpFile(filepath.Join(*upi.Bucket, objdir),
*upi.Bucket, partPath, length)
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
srcf, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("open object: %w", err)
}
defer srcf.Close()
rdr := io.NewSectionReader(srcf, startOffset, length)
hash := md5.New()
tr := io.TeeReader(rdr, hash)
_, err = io.Copy(f, tr)
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("copy part data: %w", err)
}
err = f.link()
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("link object in namespace: %w", err)
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum)
xattr.Set(filepath.Join(*upi.Bucket, partPath), etagkey, []byte(etag))
fi, err = os.Stat(filepath.Join(*upi.Bucket, partPath))
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("stat part path: %w", err)
}
return s3response.CopyObjectResult{
ETag: etag,
LastModified: fi.ModTime(),
}, nil
}
func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) {
_, err := os.Stat(*po.Bucket)
if errors.Is(err, fs.ErrNotExist) {