Compare commits

...

1 Commits

Author SHA1 Message Date
jonaustin09
24540ab17e fix: Fixes object version creatiion racing in posix 2024-11-06 14:59:01 -05:00
3 changed files with 71 additions and 12 deletions

View File

@@ -29,6 +29,8 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@@ -71,6 +73,8 @@ type Posix struct {
// newDirPerm is the permission to set on newly created directories
newDirPerm fs.FileMode
objLocks *sync.Map
}
var _ backend.Backend = &Posix{}
@@ -172,6 +176,7 @@ func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, erro
bucketlinks: opts.BucketLinks,
versioningDir: verioningdirAbs,
newDirPerm: opts.NewDirPerm,
objLocks: &sync.Map{},
}, nil
}
@@ -2139,6 +2144,31 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
}, nil
}
type refCountedLock struct {
mu sync.Mutex
count int32
}
func (p *Posix) getLock(key string) *refCountedLock {
actual, _ := p.objLocks.LoadOrStore(key, &refCountedLock{count: 0})
lock := actual.(*refCountedLock)
// Increment counter if the lock already exists
if actual != nil {
atomic.AddInt32(&lock.count, 1)
}
return lock
}
func (p *Posix) releaseLock(key string, lock *refCountedLock) {
lock.mu.Unlock()
// Decrement the counter
if atomic.AddInt32(&lock.count, -1) == 0 {
p.objLocks.Delete(key)
}
}
func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
@@ -2228,13 +2258,20 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
// Lock the call, if it indicates a new object version creation
if vEnabled {
objLock := p.getLock(name)
objLock.mu.Lock()
defer p.releaseLock(name, objLock)
}
// object is file
d, err := os.Stat(name)
if err == nil && d.IsDir() {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
}
// if the versioninng is enabled first create the file object version
// if the versioning is enabled first create the file object version
if p.versioningEnabled() && vStatus != "" && err == nil {
var isVersionIdMissing bool
if p.isBucketVersioningSuspended(vStatus) {

View File

@@ -616,7 +616,7 @@ func TestVersioning(s *S3Conf) {
Versioning_WORM_obj_version_locked_with_governance_retention(s)
Versioning_WORM_obj_version_locked_with_compliance_retention(s)
// Concurrent requests
//Versioninig_concurrent_upload_object(s)
Versioning_concurrent_upload_object(s)
}
func TestVersioningDisabled(s *S3Conf) {

View File

@@ -27,6 +27,7 @@ import (
"net/http"
"net/url"
"regexp"
"sort"
"strings"
"sync"
"time"
@@ -13386,34 +13387,43 @@ func Versioning_concurrent_upload_object(s *S3Conf) error {
versionCount := 5
// Channel to collect errors
errCh := make(chan error, versionCount)
// Channel to collect object verisons
versionsCh := make(chan types.ObjectVersion, versionCount)
uploadVersion := func(wg *sync.WaitGroup) {
uploadVersion := func(wg *sync.WaitGroup, length int64) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.PutObject(ctx, &s3.PutObjectInput{
out, err := putObjectWithData(length, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
})
cancel()
}, s3client)
if err != nil {
// Send error to the channel
errCh <- err
return
}
fmt.Printf("uploaded object successfully: versionId: %v\n", *res.VersionId)
versionsCh <- types.ObjectVersion{
ETag: out.res.ETag,
StorageClass: types.ObjectVersionStorageClassStandard,
IsLatest: getBoolPtr(false),
Key: &obj,
VersionId: out.res.VersionId,
Size: &length,
}
fmt.Printf("uploaded object successfully: versionId: %v\n", *out.res.VersionId)
}
wg := &sync.WaitGroup{}
wg.Add(versionCount)
for i := 0; i < versionCount; i++ {
go uploadVersion(wg)
go uploadVersion(wg, int64(i*100))
}
wg.Wait()
close(errCh)
close(versionsCh)
// Check if there were any errors
for err := range errCh {
@@ -13423,6 +13433,18 @@ func Versioning_concurrent_upload_object(s *S3Conf) error {
}
}
versions := []types.ObjectVersion{}
for el := range versionsCh {
versions = append(versions, el)
}
sort.SliceStable(versions, func(i, j int) bool {
return *versions[i].VersionId > *versions[j].VersionId
})
versions[0].IsLatest = getBoolPtr(true)
// List object versions after all uploads
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.ListObjectVersions(ctx, &s3.ListObjectVersionsInput{
@@ -13433,8 +13455,8 @@ func Versioning_concurrent_upload_object(s *S3Conf) error {
return err
}
if len(res.Versions) != versionCount {
return fmt.Errorf("expected %v object versions, instead got %v", versionCount, len(res.Versions))
if !compareVersions(versions, res.Versions) {
return fmt.Errorf("expected the object versions to be %v, instead got %v", versions, res.Versions)
}
return nil