mirror of
https://github.com/versity/versitygw.git
synced 2026-01-03 10:35:15 +00:00
feat: sync recent posix changes to scoutfs
This syncs recent updates to posix for scoutfs backend including the extra metadata such as Content-Disposition, Content-Language, Cache-Control and Expires. This also fixes the directory object listings that have a double trailing slash due to the change in the backend.Walk(). This also simplifies head-object to call the posix on and then post process for glacier changes. This allows keeping in closer sync with posix head-object over time.
This commit is contained in:
@@ -1264,7 +1264,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
}
|
||||
}
|
||||
|
||||
err = p.storeObjectMetadata(nil, bucket, filepath.Join(objdir, uploadID), ObjectMetadata{
|
||||
err = p.storeObjectMetadata(nil, bucket, filepath.Join(objdir, uploadID), objectMetadata{
|
||||
ContentType: mpu.ContentType,
|
||||
ContentEncoding: mpu.ContentEncoding,
|
||||
ContentDisposition: mpu.ContentDisposition,
|
||||
@@ -1540,7 +1540,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
|
||||
upiddir := filepath.Join(objdir, uploadID)
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
objMeta := p.loadObjectMetaData(bucket, upiddir, userMetaData)
|
||||
objMeta := p.loadObjectMetaData(bucket, upiddir, nil, userMetaData)
|
||||
err = p.storeObjectMetadata(f.File(), bucket, object, objMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -1822,7 +1822,7 @@ func (p *Posix) retrieveUploadId(bucket, object string) (string, [32]byte, error
|
||||
return entries[0].Name(), sum, nil
|
||||
}
|
||||
|
||||
type ObjectMetadata struct {
|
||||
type objectMetadata struct {
|
||||
ContentType *string
|
||||
ContentEncoding *string
|
||||
ContentDisposition *string
|
||||
@@ -1833,10 +1833,10 @@ type ObjectMetadata struct {
|
||||
|
||||
// fill out the user metadata map with the metadata for the object
|
||||
// and return object meta properties as `ObjectMetadata`
|
||||
func (p *Posix) loadObjectMetaData(bucket, object string, m map[string]string) ObjectMetadata {
|
||||
func (p *Posix) loadObjectMetaData(bucket, object string, fi *os.FileInfo, m map[string]string) objectMetadata {
|
||||
ents, err := p.meta.ListAttributes(bucket, object)
|
||||
if err != nil || len(ents) == 0 {
|
||||
return ObjectMetadata{}
|
||||
return objectMetadata{}
|
||||
}
|
||||
|
||||
if m != nil {
|
||||
@@ -1856,13 +1856,20 @@ func (p *Posix) loadObjectMetaData(bucket, object string, m map[string]string) O
|
||||
}
|
||||
}
|
||||
|
||||
var result ObjectMetadata
|
||||
var result objectMetadata
|
||||
|
||||
b, err := p.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr)
|
||||
if err == nil {
|
||||
result.ContentType = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
if (result.ContentType == nil || *result.ContentType == "") && fi != nil {
|
||||
if (*fi).IsDir() {
|
||||
// this is the media type for directories in AWS and Nextcloud
|
||||
result.ContentType = backend.GetPtrFromString("application/x-directory")
|
||||
}
|
||||
}
|
||||
|
||||
b, err = p.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr)
|
||||
if err == nil {
|
||||
result.ContentEncoding = backend.GetPtrFromString(string(b))
|
||||
@@ -1891,7 +1898,7 @@ func (p *Posix) loadObjectMetaData(bucket, object string, m map[string]string) O
|
||||
return result
|
||||
}
|
||||
|
||||
func (p *Posix) storeObjectMetadata(f *os.File, bucket, object string, m ObjectMetadata) error {
|
||||
func (p *Posix) storeObjectMetadata(f *os.File, bucket, object string, m objectMetadata) error {
|
||||
if getString(m.ContentType) != "" {
|
||||
err := p.meta.StoreAttribute(f, bucket, object, contentTypeHdr, []byte(*m.ContentType))
|
||||
if err != nil {
|
||||
@@ -2252,7 +2259,7 @@ func (p *Posix) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3resp
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
upiddir := filepath.Join(objdir, uploadID)
|
||||
p.loadObjectMetaData(bucket, upiddir, userMetaData)
|
||||
p.loadObjectMetaData(bucket, upiddir, nil, userMetaData)
|
||||
|
||||
return s3response.ListPartsResult{
|
||||
Bucket: bucket,
|
||||
@@ -2914,7 +2921,7 @@ func (p *Posix) PutObject(ctx context.Context, po s3response.PutObjectInput) (s3
|
||||
return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err)
|
||||
}
|
||||
|
||||
err = p.storeObjectMetadata(f.File(), *po.Bucket, *po.Key, ObjectMetadata{
|
||||
err = p.storeObjectMetadata(f.File(), *po.Bucket, *po.Key, objectMetadata{
|
||||
ContentType: po.ContentType,
|
||||
ContentEncoding: po.ContentEncoding,
|
||||
ContentLanguage: po.ContentLanguage,
|
||||
@@ -3447,7 +3454,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
|
||||
if fi.IsDir() {
|
||||
userMetaData := make(map[string]string)
|
||||
|
||||
objMeta := p.loadObjectMetaData(bucket, object, userMetaData)
|
||||
objMeta := p.loadObjectMetaData(bucket, object, &fi, userMetaData)
|
||||
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
|
||||
etag := string(b)
|
||||
if err != nil {
|
||||
@@ -3497,7 +3504,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
|
||||
objMeta := p.loadObjectMetaData(bucket, object, userMetaData)
|
||||
objMeta := p.loadObjectMetaData(bucket, object, &fi, userMetaData)
|
||||
|
||||
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
|
||||
etag := string(b)
|
||||
@@ -3708,7 +3715,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
|
||||
}
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
objMeta := p.loadObjectMetaData(bucket, object, userMetaData)
|
||||
objMeta := p.loadObjectMetaData(bucket, object, &fi, userMetaData)
|
||||
|
||||
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
|
||||
etag := string(b)
|
||||
@@ -3901,7 +3908,7 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
|
||||
}
|
||||
|
||||
mdmap := make(map[string]string)
|
||||
p.loadObjectMetaData(srcBucket, srcObject, mdmap)
|
||||
p.loadObjectMetaData(srcBucket, srcObject, &fi, mdmap)
|
||||
|
||||
var etag string
|
||||
var version *string
|
||||
|
||||
@@ -20,13 +20,12 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
@@ -35,6 +34,7 @@ import (
|
||||
"github.com/versity/versitygw/backend"
|
||||
"github.com/versity/versitygw/backend/meta"
|
||||
"github.com/versity/versitygw/backend/posix"
|
||||
"github.com/versity/versitygw/s3api/utils"
|
||||
"github.com/versity/versitygw/s3err"
|
||||
"github.com/versity/versitygw/s3response"
|
||||
)
|
||||
@@ -95,8 +95,13 @@ const (
|
||||
metaHdr = "X-Amz-Meta"
|
||||
contentTypeHdr = "content-type"
|
||||
contentEncHdr = "content-encoding"
|
||||
contentLangHdr = "content-language"
|
||||
contentDispHdr = "content-disposition"
|
||||
cacheCtrlHdr = "cache-control"
|
||||
expiresHdr = "expires"
|
||||
emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e"
|
||||
etagkey = "etag"
|
||||
checksumsKey = "checksums"
|
||||
objectRetentionKey = "object-retention"
|
||||
objectLegalHoldKey = "object-legal-hold"
|
||||
)
|
||||
@@ -113,8 +118,6 @@ const (
|
||||
onameAttr = systemPrefix + "objname"
|
||||
flagskey = systemPrefix + "sam_flags"
|
||||
stagecopykey = systemPrefix + "sam_stagereq"
|
||||
|
||||
fsBlocksize = 4096
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -229,14 +232,39 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
|
||||
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
checksums, err := s.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return nil, fmt.Errorf("get mp checksums: %w", err)
|
||||
}
|
||||
|
||||
// ChecksumType should be the same as specified on CreateMultipartUpload
|
||||
if input.ChecksumType != "" && checksums.Type != input.ChecksumType {
|
||||
checksumType := checksums.Type
|
||||
if checksumType == "" {
|
||||
checksumType = types.ChecksumType("null")
|
||||
}
|
||||
|
||||
return nil, s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
|
||||
}
|
||||
|
||||
// check all parts ok
|
||||
last := len(parts) - 1
|
||||
partsize := int64(0)
|
||||
var totalsize int64
|
||||
|
||||
// The initialie values is the lower limit of partNumber: 0
|
||||
var partNumber int32
|
||||
for i, part := range parts {
|
||||
if part.PartNumber == nil || *part.PartNumber < 1 {
|
||||
if part.PartNumber == nil {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
if *part.PartNumber < 1 {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
|
||||
}
|
||||
if *part.PartNumber <= partNumber {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
|
||||
}
|
||||
|
||||
partNumber = *part.PartNumber
|
||||
|
||||
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
|
||||
fullPartPath := filepath.Join(bucket, partObjPath)
|
||||
@@ -245,20 +273,11 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
partsize = fi.Size()
|
||||
}
|
||||
|
||||
// partsize must be a multiple of the filesystem blocksize
|
||||
// except for last part
|
||||
if i < last && partsize%fsBlocksize != 0 {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
totalsize += fi.Size()
|
||||
// all parts except the last need to be the same size
|
||||
if i < last && partsize != fi.Size() {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
// all parts except the last need to be greater, thena
|
||||
// the minimum allowed size (5 Mib)
|
||||
if i < last && fi.Size() < backend.MinPartSize {
|
||||
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
|
||||
}
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
|
||||
@@ -269,6 +288,21 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
|
||||
if parts[i].ETag == nil || etag != *parts[i].ETag {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
partChecksum, err := s.retrieveChecksums(nil, bucket, partObjPath)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return nil, fmt.Errorf("get part checksum: %w", err)
|
||||
}
|
||||
|
||||
// If checksum has been provided on mp initalization
|
||||
err = validatePartChecksum(partChecksum, part)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
|
||||
return nil, s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
|
||||
}
|
||||
|
||||
// use totalsize=0 because we wont be writing to the file, only moving
|
||||
@@ -306,7 +340,11 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
upiddir := filepath.Join(objdir, uploadID)
|
||||
cType, _ := s.loadUserMetaData(bucket, upiddir, userMetaData)
|
||||
objMeta := s.loadUserMetaData(bucket, upiddir, userMetaData)
|
||||
err = s.storeObjectMetadata(f.File(), bucket, object, objMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objname := filepath.Join(bucket, object)
|
||||
dir := filepath.Dir(objname)
|
||||
@@ -337,14 +375,6 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
|
||||
}
|
||||
}
|
||||
|
||||
// set content-type
|
||||
if cType != "" {
|
||||
err := s.meta.StoreAttribute(f.File(), bucket, object, contentTypeHdr, []byte(cType))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("set object content type: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// load and set legal hold
|
||||
lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
@@ -395,6 +425,125 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) storeObjectMetadata(f *os.File, bucket, object string, m objectMetadata) error {
|
||||
if getString(m.ContentType) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentTypeHdr, []byte(*m.ContentType))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-type: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.ContentEncoding) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentEncHdr, []byte(*m.ContentEncoding))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-encoding: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.ContentDisposition) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentDispHdr, []byte(*m.ContentDisposition))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-disposition: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.ContentLanguage) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentLangHdr, []byte(*m.ContentLanguage))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-language: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.CacheControl) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, cacheCtrlHdr, []byte(*m.CacheControl))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set cache-control: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.Expires) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, expiresHdr, []byte(*m.Expires))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set cache-control: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validatePartChecksum(checksum s3response.Checksum, part types.CompletedPart) error {
|
||||
n := numberOfChecksums(part)
|
||||
if n > 1 {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidChecksumPart)
|
||||
}
|
||||
if checksum.Algorithm == "" {
|
||||
if n != 0 {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
algo := checksum.Algorithm
|
||||
if n == 0 {
|
||||
return s3err.APIError{
|
||||
Code: "InvalidRequest",
|
||||
Description: fmt.Sprintf("The upload was created using a %v checksum. The complete request must include the checksum for each part. It was missing for part %v in the request.", strings.ToLower(string(algo)), *part.PartNumber),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
|
||||
for _, cs := range []struct {
|
||||
checksum *string
|
||||
expectedChecksum string
|
||||
algo types.ChecksumAlgorithm
|
||||
}{
|
||||
{part.ChecksumCRC32, getString(checksum.CRC32), types.ChecksumAlgorithmCrc32},
|
||||
{part.ChecksumCRC32C, getString(checksum.CRC32C), types.ChecksumAlgorithmCrc32c},
|
||||
{part.ChecksumSHA1, getString(checksum.SHA1), types.ChecksumAlgorithmSha1},
|
||||
{part.ChecksumSHA256, getString(checksum.SHA256), types.ChecksumAlgorithmSha256},
|
||||
{part.ChecksumCRC64NVME, getString(checksum.CRC64NVME), types.ChecksumAlgorithmCrc64nvme},
|
||||
} {
|
||||
if cs.checksum == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !utils.IsValidChecksum(*cs.checksum, cs.algo) {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidChecksumPart)
|
||||
}
|
||||
|
||||
if *cs.checksum != cs.expectedChecksum {
|
||||
if algo == cs.algo {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
return s3err.APIError{
|
||||
Code: "BadDigest",
|
||||
Description: fmt.Sprintf("The %v you specified for part %v did not match what we received.", strings.ToLower(string(cs.algo)), *part.PartNumber),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func numberOfChecksums(part types.CompletedPart) int {
|
||||
counter := 0
|
||||
if getString(part.ChecksumCRC32) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumCRC32C) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumSHA1) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumSHA256) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumCRC64NVME) != "" {
|
||||
counter++
|
||||
}
|
||||
|
||||
return counter
|
||||
}
|
||||
|
||||
func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) {
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
@@ -409,12 +558,21 @@ func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte
|
||||
return sum, nil
|
||||
}
|
||||
|
||||
type objectMetadata struct {
|
||||
ContentType *string
|
||||
ContentEncoding *string
|
||||
ContentDisposition *string
|
||||
ContentLanguage *string
|
||||
CacheControl *string
|
||||
Expires *string
|
||||
}
|
||||
|
||||
// fll out the user metadata map with the metadata for the object
|
||||
// and return the content type and encoding
|
||||
func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (string, string) {
|
||||
func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) objectMetadata {
|
||||
ents, err := s.meta.ListAttributes(bucket, object)
|
||||
if err != nil || len(ents) == 0 {
|
||||
return "", ""
|
||||
return objectMetadata{}
|
||||
}
|
||||
for _, e := range ents {
|
||||
if !isValidMeta(e) {
|
||||
@@ -431,20 +589,39 @@ func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (
|
||||
m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = string(b)
|
||||
}
|
||||
|
||||
var contentType, contentEncoding string
|
||||
b, _ := s.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr)
|
||||
contentType = string(b)
|
||||
if contentType != "" {
|
||||
m[contentTypeHdr] = contentType
|
||||
var result objectMetadata
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr)
|
||||
if err == nil {
|
||||
result.ContentType = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, _ = s.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr)
|
||||
contentEncoding = string(b)
|
||||
if contentEncoding != "" {
|
||||
m[contentEncHdr] = contentEncoding
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr)
|
||||
if err == nil {
|
||||
result.ContentEncoding = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
return contentType, contentEncoding
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentDispHdr)
|
||||
if err == nil {
|
||||
result.ContentDisposition = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentLangHdr)
|
||||
if err == nil {
|
||||
result.ContentLanguage = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, cacheCtrlHdr)
|
||||
if err == nil {
|
||||
result.CacheControl = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, expiresHdr)
|
||||
if err == nil {
|
||||
result.Expires = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func isValidMeta(val string) bool {
|
||||
@@ -458,99 +635,17 @@ func isValidMeta(val string) bool {
|
||||
}
|
||||
|
||||
func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
|
||||
if input.Bucket == nil {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
|
||||
}
|
||||
if input.Key == nil {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
bucket := *input.Bucket
|
||||
object := *input.Key
|
||||
|
||||
if input.PartNumber != nil {
|
||||
uploadId, sum, err := s.retrieveUploadId(bucket, object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ents, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read parts: %w", err)
|
||||
}
|
||||
|
||||
partPath := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber))
|
||||
|
||||
part, err := os.Stat(filepath.Join(bucket, partPath))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
if errors.Is(err, syscall.ENAMETOOLONG) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrKeyTooLong)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat part: %w", err)
|
||||
}
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, partPath, etagkey)
|
||||
etag := string(b)
|
||||
if err != nil {
|
||||
etag = ""
|
||||
}
|
||||
partsCount := int32(len(ents))
|
||||
size := part.Size()
|
||||
|
||||
return &s3.HeadObjectOutput{
|
||||
LastModified: backend.GetTimePtr(part.ModTime()),
|
||||
ETag: &etag,
|
||||
PartsCount: &partsCount,
|
||||
ContentLength: &size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
_, err := os.Stat(bucket)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
||||
}
|
||||
res, err := s.Posix.HeadObject(ctx, input)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat bucket: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objPath := filepath.Join(bucket, object)
|
||||
|
||||
fi, err := os.Stat(objPath)
|
||||
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if errors.Is(err, syscall.ENAMETOOLONG) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrKeyTooLong)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat object: %w", err)
|
||||
}
|
||||
if strings.HasSuffix(object, "/") && !fi.IsDir() {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData)
|
||||
|
||||
if fi.IsDir() {
|
||||
// this is the media type for directories in AWS and Nextcloud
|
||||
contentType = "application/x-directory"
|
||||
}
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, object, etagkey)
|
||||
etag := string(b)
|
||||
if err != nil {
|
||||
etag = ""
|
||||
}
|
||||
|
||||
stclass := types.StorageClassStandard
|
||||
requestOngoing := ""
|
||||
if s.glaciermode {
|
||||
objPath := filepath.Join(*input.Bucket, *input.Key)
|
||||
|
||||
stclass := types.StorageClassStandard
|
||||
requestOngoing := ""
|
||||
|
||||
requestOngoing = stageComplete
|
||||
|
||||
// Check if there are any offline exents associated with this file.
|
||||
@@ -577,62 +672,17 @@ func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s
|
||||
requestOngoing = stageInProgress
|
||||
}
|
||||
}
|
||||
|
||||
res.Restore = &requestOngoing
|
||||
res.StorageClass = stclass
|
||||
}
|
||||
|
||||
contentLength := fi.Size()
|
||||
|
||||
var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus
|
||||
status, err := s.Posix.GetObjectLegalHold(ctx, bucket, object, *input.VersionId)
|
||||
if err == nil {
|
||||
if *status {
|
||||
objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOn
|
||||
} else {
|
||||
objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOff
|
||||
}
|
||||
}
|
||||
|
||||
var objectLockMode types.ObjectLockMode
|
||||
var objectLockRetainUntilDate *time.Time
|
||||
retention, err := s.Posix.GetObjectRetention(ctx, bucket, object, *input.VersionId)
|
||||
if err == nil {
|
||||
var config types.ObjectLockRetention
|
||||
if err := json.Unmarshal(retention, &config); err == nil {
|
||||
objectLockMode = types.ObjectLockMode(config.Mode)
|
||||
objectLockRetainUntilDate = config.RetainUntilDate
|
||||
}
|
||||
}
|
||||
|
||||
return &s3.HeadObjectOutput{
|
||||
ContentLength: &contentLength,
|
||||
ContentType: &contentType,
|
||||
ContentEncoding: &contentEncoding,
|
||||
ETag: &etag,
|
||||
LastModified: backend.GetTimePtr(fi.ModTime()),
|
||||
Metadata: userMetaData,
|
||||
StorageClass: stclass,
|
||||
Restore: &requestOngoing,
|
||||
ObjectLockLegalHoldStatus: objectLockLegalHoldStatus,
|
||||
ObjectLockMode: objectLockMode,
|
||||
ObjectLockRetainUntilDate: objectLockRetainUntilDate,
|
||||
}, nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) retrieveUploadId(bucket, object string) (string, [32]byte, error) {
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
entries, err := os.ReadDir(objdir)
|
||||
if err != nil || len(entries) == 0 {
|
||||
return "", [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
return entries[0].Name(), sum, nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
|
||||
func (s *ScoutFS) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
|
||||
bucket := *input.Bucket
|
||||
object := *input.Key
|
||||
acceptRange := *input.Range
|
||||
|
||||
_, err := os.Stat(bucket)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -659,23 +709,6 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.Ge
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
objSize := fi.Size()
|
||||
startOffset, length, isValid, err := backend.ParseGetObjectRange(objSize, acceptRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if fi.IsDir() {
|
||||
// directory objects are always 0 len
|
||||
objSize = 0
|
||||
length = 0
|
||||
}
|
||||
|
||||
var contentRange string
|
||||
if isValid {
|
||||
contentRange = fmt.Sprintf("bytes %v-%v/%v", startOffset, startOffset+length-1, objSize)
|
||||
}
|
||||
|
||||
if s.glaciermode {
|
||||
// Check if there are any offline exents associated with this file.
|
||||
// If so, we will return the InvalidObjectState error.
|
||||
@@ -691,67 +724,7 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.Ge
|
||||
}
|
||||
}
|
||||
|
||||
f, err := os.Open(objPath)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open object: %w", err)
|
||||
}
|
||||
|
||||
rdr := io.NewSectionReader(f, startOffset, length)
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
|
||||
contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData)
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, object, etagkey)
|
||||
etag := string(b)
|
||||
if err != nil {
|
||||
etag = ""
|
||||
}
|
||||
|
||||
tags, err := s.getXattrTags(bucket, object)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get object tags: %w", err)
|
||||
}
|
||||
|
||||
tagCount := int32(len(tags))
|
||||
|
||||
return &s3.GetObjectOutput{
|
||||
AcceptRanges: backend.GetPtrFromString("bytes"),
|
||||
ContentLength: &length,
|
||||
ContentEncoding: &contentEncoding,
|
||||
ContentType: &contentType,
|
||||
ETag: &etag,
|
||||
LastModified: backend.GetTimePtr(fi.ModTime()),
|
||||
Metadata: userMetaData,
|
||||
TagCount: &tagCount,
|
||||
StorageClass: types.StorageClassStandard,
|
||||
ContentRange: &contentRange,
|
||||
Body: &backend.FileSectionReadCloser{R: rdr, F: f},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) getXattrTags(bucket, object string) (map[string]string, error) {
|
||||
tags := make(map[string]string)
|
||||
b, err := xattr.Get(filepath.Join(bucket, object), "user."+tagHdr)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if isNoAttr(err) {
|
||||
return tags, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get tags: %w", err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(b, &tags)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshal tags: %w", err)
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
return s.Posix.GetObject(ctx, input)
|
||||
}
|
||||
|
||||
func (s *ScoutFS) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
|
||||
@@ -877,17 +850,24 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
|
||||
return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err)
|
||||
}
|
||||
|
||||
key := path + "/"
|
||||
size := int64(0)
|
||||
mtime := fi.ModTime()
|
||||
|
||||
return s3response.Object{
|
||||
ETag: &etag,
|
||||
Key: &key,
|
||||
Key: &path,
|
||||
LastModified: &mtime,
|
||||
Size: &size,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Retreive the object checksum algorithm
|
||||
checksums, err := s.retrieveChecksums(nil, bucket, path)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
}
|
||||
|
||||
// file object, get object info and fill out object data
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -929,15 +909,27 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
|
||||
mtime := fi.ModTime()
|
||||
|
||||
return s3response.Object{
|
||||
ETag: &etag,
|
||||
Key: &path,
|
||||
LastModified: &mtime,
|
||||
Size: &size,
|
||||
StorageClass: sc,
|
||||
ETag: &etag,
|
||||
Key: &path,
|
||||
LastModified: &mtime,
|
||||
Size: &size,
|
||||
StorageClass: sc,
|
||||
ChecksumAlgorithm: []types.ChecksumAlgorithm{checksums.Algorithm},
|
||||
ChecksumType: checksums.Type,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ScoutFS) retrieveChecksums(f *os.File, bucket, object string) (checksums s3response.Checksum, err error) {
|
||||
checksumsAtr, err := s.meta.RetrieveAttribute(f, bucket, object, checksumsKey)
|
||||
if err != nil {
|
||||
return checksums, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(checksumsAtr, &checksums)
|
||||
return checksums, err
|
||||
}
|
||||
|
||||
// RestoreObject will set stage request on file if offline and do nothing if
|
||||
// file is online
|
||||
func (s *ScoutFS) RestoreObject(_ context.Context, input *s3.RestoreObjectInput) error {
|
||||
@@ -963,6 +955,13 @@ func (s *ScoutFS) RestoreObject(_ context.Context, input *s3.RestoreObjectInput)
|
||||
return nil
|
||||
}
|
||||
|
||||
func getString(str *string) string {
|
||||
if str == nil {
|
||||
return ""
|
||||
}
|
||||
return *str
|
||||
}
|
||||
|
||||
func isStaging(objname string) (bool, error) {
|
||||
b, err := xattr.Get(objname, flagskey)
|
||||
if err != nil && !isNoAttr(err) {
|
||||
|
||||
@@ -88,7 +88,6 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
|
||||
// so we can skip a directory without an early return
|
||||
var skipflag error
|
||||
if d.IsDir() {
|
||||
fmt.Println("path: ", path)
|
||||
// If prefix is defined and the directory does not match prefix,
|
||||
// do not descend into the directory because nothing will
|
||||
// match this prefix. Make sure to append the / at the end of
|
||||
|
||||
Reference in New Issue
Block a user