|
|
|
|
@@ -16,8 +16,10 @@ package scoutfs
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"crypto/sha256"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"io/fs"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
@@ -37,6 +39,17 @@ type ScoutFS struct {
|
|
|
|
|
*posix.Posix
|
|
|
|
|
rootfd *os.File
|
|
|
|
|
rootdir string
|
|
|
|
|
|
|
|
|
|
// glaciermode enables the following behavior:
|
|
|
|
|
// GET object: if file offline, return invalid object state
|
|
|
|
|
// HEAD object: if file offline, set obj storage class to GLACIER
|
|
|
|
|
// if file offline and staging, x-amz-restore: ongoing-request="true"
|
|
|
|
|
// if file offline and not staging, x-amz-restore: ongoing-request="false"
|
|
|
|
|
// if file online, x-amz-restore: ongoing-request="false", expiry-date="Fri, 2 Dec 2050 00:00:00 GMT"
|
|
|
|
|
// note: this expiry-date is not used but provided for client glacier compatibility
|
|
|
|
|
// ListObjects: if file offline, set obj storage class to GLACIER
|
|
|
|
|
// RestoreObject: add batch stage request to file
|
|
|
|
|
glaciermode bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ backend.Backend = &ScoutFS{}
|
|
|
|
|
@@ -44,11 +57,50 @@ var _ backend.Backend = &ScoutFS{}
|
|
|
|
|
const (
|
|
|
|
|
metaTmpDir = ".sgwtmp"
|
|
|
|
|
metaTmpMultipartDir = metaTmpDir + "/multipart"
|
|
|
|
|
onameAttr = "user.objname"
|
|
|
|
|
tagHdr = "X-Amz-Tagging"
|
|
|
|
|
emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e"
|
|
|
|
|
etagkey = "user.etag"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
stageComplete = "ongoing-request=\"false\", expiry-date=\"Fri, 2 Dec 2050 00:00:00 GMT\""
|
|
|
|
|
stageInProgress = "true"
|
|
|
|
|
stageNotInProgress = "false"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// ScoutFS special xattr types
|
|
|
|
|
|
|
|
|
|
systemPrefix = "scoutfs.hide."
|
|
|
|
|
onameAttr = systemPrefix + "objname"
|
|
|
|
|
flagskey = systemPrefix + "sam_flags"
|
|
|
|
|
stagecopykey = systemPrefix + "sam_stagereq"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// ScoutAM Flags
|
|
|
|
|
|
|
|
|
|
// Staging - file requested stage
|
|
|
|
|
Staging uint64 = 1 << iota
|
|
|
|
|
// StageFail - all copies failed to stage
|
|
|
|
|
StageFail
|
|
|
|
|
// NoArchive - no archive copies of file should be made
|
|
|
|
|
NoArchive
|
|
|
|
|
// ExtCacheRequested means file policy requests Ext Cache
|
|
|
|
|
ExtCacheRequested
|
|
|
|
|
// ExtCacheDone means this file ext cache copy has been
|
|
|
|
|
// created already (and possibly pruned, so may not exist)
|
|
|
|
|
ExtCacheDone
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Option sets various options for scoutfs
|
|
|
|
|
type Option func(s *ScoutFS)
|
|
|
|
|
|
|
|
|
|
// WithGlacierEmulation sets glacier mode emulation
|
|
|
|
|
func WithGlacierEmulation() Option {
|
|
|
|
|
return func(s *ScoutFS) { s.glaciermode = true }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) Shutdown() {
|
|
|
|
|
s.Posix.Shutdown()
|
|
|
|
|
s.rootfd.Close()
|
|
|
|
|
@@ -62,7 +114,7 @@ func (*ScoutFS) String() string {
|
|
|
|
|
// CompleteMultipartUpload scoutfs complete upload uses scoutfs move blocks
|
|
|
|
|
// ioctl to not have to read and copy the part data to the final object. This
|
|
|
|
|
// saves a read and write cycle for all mutlipart uploads.
|
|
|
|
|
func (p *ScoutFS) CompleteMultipartUpload(bucket, object, uploadID string, parts []types.Part) (*s3.CompleteMultipartUploadOutput, error) {
|
|
|
|
|
func (s *ScoutFS) CompleteMultipartUpload(bucket, object, uploadID string, parts []types.Part) (*s3.CompleteMultipartUploadOutput, error) {
|
|
|
|
|
_, err := os.Stat(bucket)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
|
|
|
|
@@ -71,7 +123,7 @@ func (p *ScoutFS) CompleteMultipartUpload(bucket, object, uploadID string, parts
|
|
|
|
|
return nil, fmt.Errorf("stat bucket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sum, err := p.checkUploadIDExists(bucket, object, uploadID)
|
|
|
|
|
sum, err := s.checkUploadIDExists(bucket, object, uploadID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
@@ -185,7 +237,7 @@ func (p *ScoutFS) CompleteMultipartUpload(bucket, object, uploadID string, parts
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) {
|
|
|
|
|
func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) {
|
|
|
|
|
sum := sha256.Sum256([]byte(object))
|
|
|
|
|
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
|
|
|
|
|
|
|
|
|
@@ -294,3 +346,398 @@ func mkdirAll(path string, perm os.FileMode, bucket, object string) error {
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) HeadObject(bucket, object string) (*s3.HeadObjectOutput, error) {
|
|
|
|
|
_, err := os.Stat(bucket)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat bucket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
objPath := filepath.Join(bucket, object)
|
|
|
|
|
fi, err := os.Stat(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat object: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
userMetaData := make(map[string]string)
|
|
|
|
|
contentType, contentEncoding := loadUserMetaData(objPath, userMetaData)
|
|
|
|
|
|
|
|
|
|
b, err := xattr.Get(objPath, etagkey)
|
|
|
|
|
etag := string(b)
|
|
|
|
|
if err != nil {
|
|
|
|
|
etag = ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stclass := types.StorageClassStandard
|
|
|
|
|
requestOngoing := ""
|
|
|
|
|
if s.glaciermode {
|
|
|
|
|
requestOngoing = stageComplete
|
|
|
|
|
|
|
|
|
|
// Check if there are any offline exents associated with this file.
|
|
|
|
|
// If so, we will set storage class to glacier.
|
|
|
|
|
st, err := scoutfs.StatMore(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat more: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if st.Offline_blocks != 0 {
|
|
|
|
|
stclass = types.StorageClassGlacier
|
|
|
|
|
requestOngoing = stageNotInProgress
|
|
|
|
|
|
|
|
|
|
ok, err := isStaging(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("check stage status: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if ok {
|
|
|
|
|
requestOngoing = stageInProgress
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &s3.HeadObjectOutput{
|
|
|
|
|
ContentLength: fi.Size(),
|
|
|
|
|
ContentType: &contentType,
|
|
|
|
|
ContentEncoding: &contentEncoding,
|
|
|
|
|
ETag: &etag,
|
|
|
|
|
LastModified: backend.GetTimePtr(fi.ModTime()),
|
|
|
|
|
Metadata: userMetaData,
|
|
|
|
|
StorageClass: stclass,
|
|
|
|
|
Restore: &requestOngoing,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) GetObject(bucket, object, acceptRange string, writer io.Writer) (*s3.GetObjectOutput, error) {
|
|
|
|
|
_, err := os.Stat(bucket)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat bucket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
objPath := filepath.Join(bucket, object)
|
|
|
|
|
fi, err := os.Stat(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat object: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
startOffset, length, err := backend.ParseRange(fi, acceptRange)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if startOffset+length > fi.Size() {
|
|
|
|
|
// TODO: is ErrInvalidRequest correct here?
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if s.glaciermode {
|
|
|
|
|
// Check if there are any offline exents associated with this file.
|
|
|
|
|
// If so, we will return the InvalidObjectState error.
|
|
|
|
|
st, err := scoutfs.StatMore(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat more: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if st.Offline_blocks != 0 {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrInvalidObjectState)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
f, err := os.Open(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("open object: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
|
|
rdr := io.NewSectionReader(f, startOffset, length)
|
|
|
|
|
_, err = io.Copy(writer, rdr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("copy data: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
userMetaData := make(map[string]string)
|
|
|
|
|
|
|
|
|
|
contentType, contentEncoding := loadUserMetaData(objPath, userMetaData)
|
|
|
|
|
|
|
|
|
|
b, err := xattr.Get(objPath, etagkey)
|
|
|
|
|
etag := string(b)
|
|
|
|
|
if err != nil {
|
|
|
|
|
etag = ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tags, err := s.getXattrTags(bucket, object)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("get object tags: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &s3.GetObjectOutput{
|
|
|
|
|
AcceptRanges: &acceptRange,
|
|
|
|
|
ContentLength: length,
|
|
|
|
|
ContentEncoding: &contentEncoding,
|
|
|
|
|
ContentType: &contentType,
|
|
|
|
|
ETag: &etag,
|
|
|
|
|
LastModified: backend.GetTimePtr(fi.ModTime()),
|
|
|
|
|
Metadata: userMetaData,
|
|
|
|
|
TagCount: int32(len(tags)),
|
|
|
|
|
StorageClass: types.StorageClassStandard,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) getXattrTags(bucket, object string) (map[string]string, error) {
|
|
|
|
|
tags := make(map[string]string)
|
|
|
|
|
b, err := xattr.Get(filepath.Join(bucket, object), "user."+tagHdr)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if isNoAttr(err) {
|
|
|
|
|
return tags, nil
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("get tags: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = json.Unmarshal(b, &tags)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unmarshal tags: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return tags, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) ListObjects(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListObjectsOutput, error) {
|
|
|
|
|
_, err := os.Stat(bucket)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat bucket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fileSystem := os.DirFS(bucket)
|
|
|
|
|
results, err := backend.Walk(fileSystem, prefix, delim, marker, maxkeys,
|
|
|
|
|
s.fileToObj(bucket), []string{metaTmpDir})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("walk %v: %w", bucket, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &s3.ListObjectsOutput{
|
|
|
|
|
CommonPrefixes: results.CommonPrefixes,
|
|
|
|
|
Contents: results.Objects,
|
|
|
|
|
Delimiter: &delim,
|
|
|
|
|
IsTruncated: results.Truncated,
|
|
|
|
|
Marker: &marker,
|
|
|
|
|
MaxKeys: int32(maxkeys),
|
|
|
|
|
Name: &bucket,
|
|
|
|
|
NextMarker: &results.NextMarker,
|
|
|
|
|
Prefix: &prefix,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) ListObjectsV2(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListObjectsV2Output, error) {
|
|
|
|
|
_, err := os.Stat(bucket)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("stat bucket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fileSystem := os.DirFS(bucket)
|
|
|
|
|
results, err := backend.Walk(fileSystem, prefix, delim, marker, maxkeys,
|
|
|
|
|
s.fileToObj(bucket), []string{metaTmpDir})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("walk %v: %w", bucket, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &s3.ListObjectsV2Output{
|
|
|
|
|
CommonPrefixes: results.CommonPrefixes,
|
|
|
|
|
Contents: results.Objects,
|
|
|
|
|
Delimiter: &delim,
|
|
|
|
|
IsTruncated: results.Truncated,
|
|
|
|
|
ContinuationToken: &marker,
|
|
|
|
|
MaxKeys: int32(maxkeys),
|
|
|
|
|
Name: &bucket,
|
|
|
|
|
NextContinuationToken: &results.NextMarker,
|
|
|
|
|
Prefix: &prefix,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
|
|
|
|
|
return func(path string, d fs.DirEntry) (types.Object, error) {
|
|
|
|
|
objPath := filepath.Join(bucket, path)
|
|
|
|
|
if d.IsDir() {
|
|
|
|
|
// directory object only happens if directory empty
|
|
|
|
|
// check to see if this is a directory object by checking etag
|
|
|
|
|
etagBytes, err := xattr.Get(objPath, etagkey)
|
|
|
|
|
if isNoAttr(err) || errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return types.Object{}, backend.ErrSkipObj
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return types.Object{}, fmt.Errorf("get etag: %w", err)
|
|
|
|
|
}
|
|
|
|
|
etag := string(etagBytes)
|
|
|
|
|
|
|
|
|
|
fi, err := d.Info()
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return types.Object{}, backend.ErrSkipObj
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return types.Object{}, fmt.Errorf("get fileinfo: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return types.Object{
|
|
|
|
|
ETag: &etag,
|
|
|
|
|
Key: &path,
|
|
|
|
|
LastModified: backend.GetTimePtr(fi.ModTime()),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// file object, get object info and fill out object data
|
|
|
|
|
etagBytes, err := xattr.Get(objPath, etagkey)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return types.Object{}, backend.ErrSkipObj
|
|
|
|
|
}
|
|
|
|
|
if err != nil && !isNoAttr(err) {
|
|
|
|
|
return types.Object{}, fmt.Errorf("get etag: %w", err)
|
|
|
|
|
}
|
|
|
|
|
etag := string(etagBytes)
|
|
|
|
|
|
|
|
|
|
fi, err := d.Info()
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return types.Object{}, backend.ErrSkipObj
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return types.Object{}, fmt.Errorf("get fileinfo: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sc := types.ObjectStorageClassStandard
|
|
|
|
|
if s.glaciermode {
|
|
|
|
|
// Check if there are any offline exents associated with this file.
|
|
|
|
|
// If so, we will return the InvalidObjectState error.
|
|
|
|
|
st, err := scoutfs.StatMore(objPath)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return types.Object{}, backend.ErrSkipObj
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return types.Object{}, fmt.Errorf("stat more: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if st.Offline_blocks != 0 {
|
|
|
|
|
sc = types.ObjectStorageClassGlacier
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return types.Object{
|
|
|
|
|
ETag: &etag,
|
|
|
|
|
Key: &path,
|
|
|
|
|
LastModified: backend.GetTimePtr(fi.ModTime()),
|
|
|
|
|
Size: fi.Size(),
|
|
|
|
|
StorageClass: sc,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RestoreObject will set stage request on file if offline and do nothing if
|
|
|
|
|
// file is online
|
|
|
|
|
func (s *ScoutFS) RestoreObject(bucket, object string, restoreRequest *s3.RestoreObjectInput) error {
|
|
|
|
|
_, err := os.Stat(bucket)
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("stat bucket: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = setStaging(filepath.Join(bucket, object))
|
|
|
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
|
|
|
return s3err.GetAPIError(s3err.ErrNoSuchKey)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("stage object: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func setStaging(objname string) error {
|
|
|
|
|
b, err := xattr.Get(objname, flagskey)
|
|
|
|
|
if err != nil && !isNoAttr(err) {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var oldflags uint64
|
|
|
|
|
if !isNoAttr(err) {
|
|
|
|
|
err = json.Unmarshal(b, &oldflags)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
newflags := oldflags | Staging
|
|
|
|
|
|
|
|
|
|
if newflags == oldflags {
|
|
|
|
|
// no flags change, just return
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return fSetNewGlobalFlags(objname, newflags)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func isStaging(objname string) (bool, error) {
|
|
|
|
|
b, err := xattr.Get(objname, flagskey)
|
|
|
|
|
if err != nil && !isNoAttr(err) {
|
|
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var flags uint64
|
|
|
|
|
if !isNoAttr(err) {
|
|
|
|
|
err = json.Unmarshal(b, &flags)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return flags&Staging == Staging, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func fSetNewGlobalFlags(objname string, flags uint64) error {
|
|
|
|
|
b, err := json.Marshal(&flags)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return xattr.Set(objname, flagskey, b)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func isNoAttr(err error) bool {
|
|
|
|
|
if err == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
xerr, ok := err.(*xattr.Error)
|
|
|
|
|
if ok && xerr.Err == xattr.ENOATTR {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
if err == syscall.ENODATA {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|