mirror of
https://github.com/versity/versitygw.git
synced 2026-01-25 20:42:02 +00:00
Compare commits
1 Commits
ben/plugin
...
fix/versio
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
24540ab17e |
@@ -29,6 +29,8 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -71,6 +73,8 @@ type Posix struct {
|
||||
|
||||
// newDirPerm is the permission to set on newly created directories
|
||||
newDirPerm fs.FileMode
|
||||
|
||||
objLocks *sync.Map
|
||||
}
|
||||
|
||||
var _ backend.Backend = &Posix{}
|
||||
@@ -172,6 +176,7 @@ func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, erro
|
||||
bucketlinks: opts.BucketLinks,
|
||||
versioningDir: verioningdirAbs,
|
||||
newDirPerm: opts.NewDirPerm,
|
||||
objLocks: &sync.Map{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -2139,6 +2144,31 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
}, nil
|
||||
}
|
||||
|
||||
type refCountedLock struct {
|
||||
mu sync.Mutex
|
||||
count int32
|
||||
}
|
||||
|
||||
func (p *Posix) getLock(key string) *refCountedLock {
|
||||
actual, _ := p.objLocks.LoadOrStore(key, &refCountedLock{count: 0})
|
||||
lock := actual.(*refCountedLock)
|
||||
|
||||
// Increment counter if the lock already exists
|
||||
if actual != nil {
|
||||
atomic.AddInt32(&lock.count, 1)
|
||||
}
|
||||
return lock
|
||||
}
|
||||
|
||||
func (p *Posix) releaseLock(key string, lock *refCountedLock) {
|
||||
lock.mu.Unlock()
|
||||
|
||||
// Decrement the counter
|
||||
if atomic.AddInt32(&lock.count, -1) == 0 {
|
||||
p.objLocks.Delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
|
||||
acct, ok := ctx.Value("account").(auth.Account)
|
||||
if !ok {
|
||||
@@ -2228,13 +2258,20 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
|
||||
}
|
||||
vEnabled := p.isBucketVersioningEnabled(vStatus)
|
||||
|
||||
// Lock the call, if it indicates a new object version creation
|
||||
if vEnabled {
|
||||
objLock := p.getLock(name)
|
||||
objLock.mu.Lock()
|
||||
defer p.releaseLock(name, objLock)
|
||||
}
|
||||
|
||||
// object is file
|
||||
d, err := os.Stat(name)
|
||||
if err == nil && d.IsDir() {
|
||||
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
|
||||
}
|
||||
|
||||
// if the versioninng is enabled first create the file object version
|
||||
// if the versioning is enabled first create the file object version
|
||||
if p.versioningEnabled() && vStatus != "" && err == nil {
|
||||
var isVersionIdMissing bool
|
||||
if p.isBucketVersioningSuspended(vStatus) {
|
||||
|
||||
@@ -616,7 +616,7 @@ func TestVersioning(s *S3Conf) {
|
||||
Versioning_WORM_obj_version_locked_with_governance_retention(s)
|
||||
Versioning_WORM_obj_version_locked_with_compliance_retention(s)
|
||||
// Concurrent requests
|
||||
//Versioninig_concurrent_upload_object(s)
|
||||
Versioning_concurrent_upload_object(s)
|
||||
}
|
||||
|
||||
func TestVersioningDisabled(s *S3Conf) {
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -13386,34 +13387,43 @@ func Versioning_concurrent_upload_object(s *S3Conf) error {
|
||||
versionCount := 5
|
||||
// Channel to collect errors
|
||||
errCh := make(chan error, versionCount)
|
||||
// Channel to collect object verisons
|
||||
versionsCh := make(chan types.ObjectVersion, versionCount)
|
||||
|
||||
uploadVersion := func(wg *sync.WaitGroup) {
|
||||
uploadVersion := func(wg *sync.WaitGroup, length int64) {
|
||||
defer wg.Done()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
res, err := s3client.PutObject(ctx, &s3.PutObjectInput{
|
||||
out, err := putObjectWithData(length, &s3.PutObjectInput{
|
||||
Bucket: &bucket,
|
||||
Key: &obj,
|
||||
})
|
||||
cancel()
|
||||
}, s3client)
|
||||
if err != nil {
|
||||
// Send error to the channel
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("uploaded object successfully: versionId: %v\n", *res.VersionId)
|
||||
versionsCh <- types.ObjectVersion{
|
||||
ETag: out.res.ETag,
|
||||
StorageClass: types.ObjectVersionStorageClassStandard,
|
||||
IsLatest: getBoolPtr(false),
|
||||
Key: &obj,
|
||||
VersionId: out.res.VersionId,
|
||||
Size: &length,
|
||||
}
|
||||
|
||||
fmt.Printf("uploaded object successfully: versionId: %v\n", *out.res.VersionId)
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(versionCount)
|
||||
|
||||
for i := 0; i < versionCount; i++ {
|
||||
go uploadVersion(wg)
|
||||
go uploadVersion(wg, int64(i*100))
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
close(versionsCh)
|
||||
|
||||
// Check if there were any errors
|
||||
for err := range errCh {
|
||||
@@ -13423,6 +13433,18 @@ func Versioning_concurrent_upload_object(s *S3Conf) error {
|
||||
}
|
||||
}
|
||||
|
||||
versions := []types.ObjectVersion{}
|
||||
|
||||
for el := range versionsCh {
|
||||
versions = append(versions, el)
|
||||
}
|
||||
|
||||
sort.SliceStable(versions, func(i, j int) bool {
|
||||
return *versions[i].VersionId > *versions[j].VersionId
|
||||
})
|
||||
|
||||
versions[0].IsLatest = getBoolPtr(true)
|
||||
|
||||
// List object versions after all uploads
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
res, err := s3client.ListObjectVersions(ctx, &s3.ListObjectVersionsInput{
|
||||
@@ -13433,8 +13455,8 @@ func Versioning_concurrent_upload_object(s *S3Conf) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res.Versions) != versionCount {
|
||||
return fmt.Errorf("expected %v object versions, instead got %v", versionCount, len(res.Versions))
|
||||
if !compareVersions(versions, res.Versions) {
|
||||
return fmt.Errorf("expected the object versions to be %v, instead got %v", versions, res.Versions)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user