Compare commits
3 Commits
RELEASE.20
...
RELEASE.20
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6105cc6ab4 | ||
|
|
068ad33c9a | ||
|
|
2db5cbc7a5 |
@@ -79,7 +79,7 @@ func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSo
|
||||
nsLk, found := n.lockMap[resource]
|
||||
if !found {
|
||||
nsLk = &nsLock{
|
||||
LRWMutex: lsync.NewLRWMutex(ctx),
|
||||
LRWMutex: lsync.NewLRWMutex(),
|
||||
}
|
||||
// Add a count to indicate that a parallel unlock doesn't clear this entry.
|
||||
}
|
||||
@@ -89,9 +89,9 @@ func (n *nsLockMap) lock(ctx context.Context, volume string, path string, lockSo
|
||||
|
||||
// Locking here will block (until timeout).
|
||||
if readLock {
|
||||
locked = nsLk.GetRLock(opsID, lockSource, timeout)
|
||||
locked = nsLk.GetRLock(ctx, opsID, lockSource, timeout)
|
||||
} else {
|
||||
locked = nsLk.GetLock(opsID, lockSource, timeout)
|
||||
locked = nsLk.GetLock(ctx, opsID, lockSource, timeout)
|
||||
}
|
||||
|
||||
if !locked { // We failed to get the lock
|
||||
@@ -139,6 +139,7 @@ func (n *nsLockMap) unlock(volume string, path string, readLock bool) {
|
||||
type distLockInstance struct {
|
||||
rwMutex *dsync.DRWMutex
|
||||
opsID string
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
@@ -146,7 +147,7 @@ func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error)
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
|
||||
if !di.rwMutex.GetLock(di.opsID, lockSource, timeout.Timeout()) {
|
||||
if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{}
|
||||
}
|
||||
@@ -163,7 +164,7 @@ func (di *distLockInstance) Unlock() {
|
||||
func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
|
||||
lockSource := getSource(2)
|
||||
start := UTCNow()
|
||||
if !di.rwMutex.GetRLock(di.opsID, lockSource, timeout.Timeout()) {
|
||||
if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{}
|
||||
}
|
||||
@@ -191,10 +192,10 @@ type localLockInstance struct {
|
||||
func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume string, paths ...string) RWLocker {
|
||||
opsID := mustGetUUID()
|
||||
if n.isDistXL {
|
||||
drwmutex := dsync.NewDRWMutex(ctx, &dsync.Dsync{
|
||||
drwmutex := dsync.NewDRWMutex(&dsync.Dsync{
|
||||
GetLockersFn: lockersFn,
|
||||
}, pathsJoinPrefix(volume, paths...)...)
|
||||
return &distLockInstance{drwmutex, opsID}
|
||||
return &distLockInstance{drwmutex, opsID, ctx}
|
||||
}
|
||||
sort.Strings(paths)
|
||||
return &localLockInstance{ctx, n, volume, paths, opsID}
|
||||
|
||||
@@ -770,7 +770,7 @@ func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Requ
|
||||
var latestReleaseTime time.Time
|
||||
var err error
|
||||
if latestRelease := vars[peerRESTLatestRelease]; latestRelease != "" {
|
||||
latestReleaseTime, err = time.Parse(latestRelease, time.RFC3339)
|
||||
latestReleaseTime, err = time.Parse(time.RFC3339, latestRelease)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
|
||||
@@ -1021,6 +1021,10 @@ func (xl xlObjects) PutObjectTags(ctx context.Context, bucket, object string, ta
|
||||
}
|
||||
|
||||
for i, xlMeta := range metaArr {
|
||||
if errs[i] != nil {
|
||||
// Avoid disks where loading metadata fail
|
||||
continue
|
||||
}
|
||||
// clean xlMeta.Meta of tag key, before updating the new tags
|
||||
delete(xlMeta.Meta, xhttp.AmzObjectTagging)
|
||||
// Don't update for empty tags
|
||||
|
||||
@@ -54,7 +54,6 @@ type DRWMutex struct {
|
||||
readersLocks [][]string // Array of array of nodes that granted reader locks
|
||||
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
|
||||
clnt *Dsync
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// Granted - represents a structure of a granted lock.
|
||||
@@ -72,12 +71,11 @@ func isLocked(uid string) bool {
|
||||
}
|
||||
|
||||
// NewDRWMutex - initializes a new dsync RW mutex.
|
||||
func NewDRWMutex(ctx context.Context, clnt *Dsync, names ...string) *DRWMutex {
|
||||
func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex {
|
||||
return &DRWMutex{
|
||||
writeLocks: make([]string, len(clnt.GetLockersFn())),
|
||||
Names: names,
|
||||
clnt: clnt,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +86,7 @@ func NewDRWMutex(ctx context.Context, clnt *Dsync, names ...string) *DRWMutex {
|
||||
func (dm *DRWMutex) Lock(id, source string) {
|
||||
|
||||
isReadLock := false
|
||||
dm.lockBlocking(drwMutexInfinite, id, source, isReadLock)
|
||||
dm.lockBlocking(context.Background(), drwMutexInfinite, id, source, isReadLock)
|
||||
}
|
||||
|
||||
// GetLock tries to get a write lock on dm before the timeout elapses.
|
||||
@@ -96,10 +94,10 @@ func (dm *DRWMutex) Lock(id, source string) {
|
||||
// If the lock is already in use, the calling go routine
|
||||
// blocks until either the mutex becomes available and return success or
|
||||
// more time has passed than the timeout value and return false.
|
||||
func (dm *DRWMutex) GetLock(id, source string, timeout time.Duration) (locked bool) {
|
||||
func (dm *DRWMutex) GetLock(ctx context.Context, id, source string, timeout time.Duration) (locked bool) {
|
||||
|
||||
isReadLock := false
|
||||
return dm.lockBlocking(timeout, id, source, isReadLock)
|
||||
return dm.lockBlocking(ctx, timeout, id, source, isReadLock)
|
||||
}
|
||||
|
||||
// RLock holds a read lock on dm.
|
||||
@@ -109,7 +107,7 @@ func (dm *DRWMutex) GetLock(id, source string, timeout time.Duration) (locked bo
|
||||
func (dm *DRWMutex) RLock(id, source string) {
|
||||
|
||||
isReadLock := true
|
||||
dm.lockBlocking(drwMutexInfinite, id, source, isReadLock)
|
||||
dm.lockBlocking(context.Background(), drwMutexInfinite, id, source, isReadLock)
|
||||
}
|
||||
|
||||
// GetRLock tries to get a read lock on dm before the timeout elapses.
|
||||
@@ -118,10 +116,10 @@ func (dm *DRWMutex) RLock(id, source string) {
|
||||
// Otherwise the calling go routine blocks until either the mutex becomes
|
||||
// available and return success or more time has passed than the timeout
|
||||
// value and return false.
|
||||
func (dm *DRWMutex) GetRLock(id, source string, timeout time.Duration) (locked bool) {
|
||||
func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, timeout time.Duration) (locked bool) {
|
||||
|
||||
isReadLock := true
|
||||
return dm.lockBlocking(timeout, id, source, isReadLock)
|
||||
return dm.lockBlocking(ctx, timeout, id, source, isReadLock)
|
||||
}
|
||||
|
||||
// lockBlocking will try to acquire either a read or a write lock
|
||||
@@ -129,10 +127,10 @@ func (dm *DRWMutex) GetRLock(id, source string, timeout time.Duration) (locked b
|
||||
// The function will loop using a built-in timing randomized back-off
|
||||
// algorithm until either the lock is acquired successfully or more
|
||||
// time has elapsed than the timeout value.
|
||||
func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isReadLock bool) (locked bool) {
|
||||
func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, source string, isReadLock bool) (locked bool) {
|
||||
restClnts := dm.clnt.GetLockersFn()
|
||||
|
||||
retryCtx, cancel := context.WithTimeout(dm.ctx, timeout)
|
||||
retryCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
|
||||
defer cancel()
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// GOMAXPROCS=10 go test
|
||||
|
||||
package dsync_test
|
||||
|
||||
import (
|
||||
@@ -36,14 +34,14 @@ const (
|
||||
|
||||
func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
|
||||
drwm := NewDRWMutex(context.Background(), ds, "simplelock")
|
||||
drwm := NewDRWMutex(ds, "simplelock")
|
||||
|
||||
if !drwm.GetRLock(id, source, time.Second) {
|
||||
if !drwm.GetRLock(context.Background(), id, source, time.Second) {
|
||||
panic("Failed to acquire read lock")
|
||||
}
|
||||
// fmt.Println("1st read lock acquired, waiting...")
|
||||
|
||||
if !drwm.GetRLock(id, source, time.Second) {
|
||||
if !drwm.GetRLock(context.Background(), id, source, time.Second) {
|
||||
panic("Failed to acquire read lock")
|
||||
}
|
||||
// fmt.Println("2nd read lock acquired, waiting...")
|
||||
@@ -61,7 +59,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
}()
|
||||
|
||||
// fmt.Println("Trying to acquire write lock, waiting...")
|
||||
locked = drwm.GetLock(id, source, duration)
|
||||
locked = drwm.GetLock(context.Background(), id, source, duration)
|
||||
if locked {
|
||||
// fmt.Println("Write lock acquired, waiting...")
|
||||
time.Sleep(time.Second)
|
||||
@@ -92,10 +90,10 @@ func TestSimpleWriteLockTimedOut(t *testing.T) {
|
||||
|
||||
func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
|
||||
drwm := NewDRWMutex(context.Background(), ds, "duallock")
|
||||
drwm := NewDRWMutex(ds, "duallock")
|
||||
|
||||
// fmt.Println("Getting initial write lock")
|
||||
if !drwm.GetLock(id, source, time.Second) {
|
||||
if !drwm.GetLock(context.Background(), id, source, time.Second) {
|
||||
panic("Failed to acquire initial write lock")
|
||||
}
|
||||
|
||||
@@ -106,7 +104,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
}()
|
||||
|
||||
// fmt.Println("Trying to acquire 2nd write lock, waiting...")
|
||||
locked = drwm.GetLock(id, source, duration)
|
||||
locked = drwm.GetLock(context.Background(), id, source, duration)
|
||||
if locked {
|
||||
// fmt.Println("2nd write lock acquired, waiting...")
|
||||
time.Sleep(time.Second)
|
||||
@@ -140,8 +138,8 @@ func TestDualWriteLockTimedOut(t *testing.T) {
|
||||
// Test cases below are copied 1 to 1 from sync/rwmutex_test.go (adapted to use DRWMutex)
|
||||
|
||||
// Borrowed from rwmutex_test.go
|
||||
func parallelReader(m *DRWMutex, clocked, cunlock, cdone chan bool) {
|
||||
if m.GetRLock(id, source, time.Second) {
|
||||
func parallelReader(ctx context.Context, m *DRWMutex, clocked, cunlock, cdone chan bool) {
|
||||
if m.GetRLock(ctx, id, source, time.Second) {
|
||||
clocked <- true
|
||||
<-cunlock
|
||||
m.RUnlock()
|
||||
@@ -152,13 +150,13 @@ func parallelReader(m *DRWMutex, clocked, cunlock, cdone chan bool) {
|
||||
// Borrowed from rwmutex_test.go
|
||||
func doTestParallelReaders(numReaders, gomaxprocs int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
m := NewDRWMutex(context.Background(), ds, "test-parallel")
|
||||
m := NewDRWMutex(ds, "test-parallel")
|
||||
|
||||
clocked := make(chan bool)
|
||||
cunlock := make(chan bool)
|
||||
cdone := make(chan bool)
|
||||
for i := 0; i < numReaders; i++ {
|
||||
go parallelReader(m, clocked, cunlock, cdone)
|
||||
go parallelReader(context.Background(), m, clocked, cunlock, cdone)
|
||||
}
|
||||
// Wait for all parallel RLock()s to succeed.
|
||||
for i := 0; i < numReaders; i++ {
|
||||
@@ -184,7 +182,7 @@ func TestParallelReaders(t *testing.T) {
|
||||
// Borrowed from rwmutex_test.go
|
||||
func reader(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < numIterations; i++ {
|
||||
if rwm.GetRLock(id, source, time.Second) {
|
||||
if rwm.GetRLock(context.Background(), id, source, time.Second) {
|
||||
n := atomic.AddInt32(activity, 1)
|
||||
if n < 1 || n >= 10000 {
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
@@ -201,7 +199,7 @@ func reader(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool)
|
||||
// Borrowed from rwmutex_test.go
|
||||
func writer(rwm *DRWMutex, numIterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < numIterations; i++ {
|
||||
if rwm.GetLock(id, source, time.Second) {
|
||||
if rwm.GetLock(context.Background(), id, source, time.Second) {
|
||||
n := atomic.AddInt32(activity, 10000)
|
||||
if n != 10000 {
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
@@ -220,7 +218,7 @@ func HammerRWMutex(gomaxprocs, numReaders, numIterations int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
// Number of active readers + 10000 * number of active writers.
|
||||
var activity int32
|
||||
rwm := NewDRWMutex(context.Background(), ds, "test")
|
||||
rwm := NewDRWMutex(ds, "test")
|
||||
cdone := make(chan bool)
|
||||
go writer(rwm, numIterations, &activity, cdone)
|
||||
var i int
|
||||
@@ -263,7 +261,7 @@ func TestUnlockPanic(t *testing.T) {
|
||||
t.Fatalf("unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewDRWMutex(context.Background(), ds, "test")
|
||||
mu := NewDRWMutex(ds, "test")
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -274,7 +272,7 @@ func TestUnlockPanic2(t *testing.T) {
|
||||
t.Fatalf("unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewDRWMutex(context.Background(), ds, "test-unlock-panic-2")
|
||||
mu := NewDRWMutex(ds, "test-unlock-panic-2")
|
||||
mu.RLock(id, source)
|
||||
mu.Unlock()
|
||||
}
|
||||
@@ -286,7 +284,7 @@ func TestRUnlockPanic(t *testing.T) {
|
||||
t.Fatalf("read unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewDRWMutex(context.Background(), ds, "test")
|
||||
mu := NewDRWMutex(ds, "test")
|
||||
mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -297,14 +295,14 @@ func TestRUnlockPanic2(t *testing.T) {
|
||||
t.Fatalf("read unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewDRWMutex(context.Background(), ds, "test-runlock-panic-2")
|
||||
mu := NewDRWMutex(ds, "test-runlock-panic-2")
|
||||
mu.Lock(id, source)
|
||||
mu.RUnlock()
|
||||
}
|
||||
|
||||
// Borrowed from rwmutex_test.go
|
||||
func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
|
||||
rwm := NewDRWMutex(context.Background(), ds, "test")
|
||||
rwm := NewDRWMutex(ds, "test")
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
foo := 0
|
||||
for pb.Next() {
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
package dsync_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
@@ -89,7 +88,7 @@ func TestMain(m *testing.M) {
|
||||
|
||||
func TestSimpleLock(t *testing.T) {
|
||||
|
||||
dm := NewDRWMutex(context.Background(), ds, "test")
|
||||
dm := NewDRWMutex(ds, "test")
|
||||
|
||||
dm.Lock(id, source)
|
||||
|
||||
@@ -101,7 +100,7 @@ func TestSimpleLock(t *testing.T) {
|
||||
|
||||
func TestSimpleLockUnlockMultipleTimes(t *testing.T) {
|
||||
|
||||
dm := NewDRWMutex(context.Background(), ds, "test")
|
||||
dm := NewDRWMutex(ds, "test")
|
||||
|
||||
dm.Lock(id, source)
|
||||
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
|
||||
@@ -127,8 +126,8 @@ func TestSimpleLockUnlockMultipleTimes(t *testing.T) {
|
||||
// Test two locks for same resource, one succeeds, one fails (after timeout)
|
||||
func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
|
||||
|
||||
dm1st := NewDRWMutex(context.Background(), ds, "aap")
|
||||
dm2nd := NewDRWMutex(context.Background(), ds, "aap")
|
||||
dm1st := NewDRWMutex(ds, "aap")
|
||||
dm2nd := NewDRWMutex(ds, "aap")
|
||||
|
||||
dm1st.Lock(id, source)
|
||||
|
||||
@@ -151,9 +150,9 @@ func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
|
||||
// Test three locks for same resource, one succeeds, one fails (after timeout)
|
||||
func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
||||
|
||||
dm1st := NewDRWMutex(context.Background(), ds, "aap")
|
||||
dm2nd := NewDRWMutex(context.Background(), ds, "aap")
|
||||
dm3rd := NewDRWMutex(context.Background(), ds, "aap")
|
||||
dm1st := NewDRWMutex(ds, "aap")
|
||||
dm2nd := NewDRWMutex(ds, "aap")
|
||||
dm3rd := NewDRWMutex(ds, "aap")
|
||||
|
||||
dm1st.Lock(id, source)
|
||||
|
||||
@@ -216,8 +215,8 @@ func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
||||
// Test two locks for different resources, both succeed
|
||||
func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) {
|
||||
|
||||
dm1 := NewDRWMutex(context.Background(), ds, "aap")
|
||||
dm2 := NewDRWMutex(context.Background(), ds, "noot")
|
||||
dm1 := NewDRWMutex(ds, "aap")
|
||||
dm2 := NewDRWMutex(ds, "noot")
|
||||
|
||||
dm1.Lock(id, source)
|
||||
dm2.Lock(id, source)
|
||||
@@ -247,7 +246,7 @@ func TestMutex(t *testing.T) {
|
||||
loops = 5
|
||||
}
|
||||
c := make(chan bool)
|
||||
m := NewDRWMutex(context.Background(), ds, "test")
|
||||
m := NewDRWMutex(ds, "test")
|
||||
for i := 0; i < 10; i++ {
|
||||
go HammerMutex(m, loops, c)
|
||||
}
|
||||
@@ -261,7 +260,7 @@ func BenchmarkMutexUncontended(b *testing.B) {
|
||||
*DRWMutex
|
||||
}
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var mu = PaddedMutex{NewDRWMutex(context.Background(), ds, "")}
|
||||
var mu = PaddedMutex{NewDRWMutex(ds, "")}
|
||||
for pb.Next() {
|
||||
mu.Lock(id, source)
|
||||
mu.Unlock()
|
||||
@@ -270,7 +269,7 @@ func BenchmarkMutexUncontended(b *testing.B) {
|
||||
}
|
||||
|
||||
func benchmarkMutex(b *testing.B, slack, work bool) {
|
||||
mu := NewDRWMutex(context.Background(), ds, "")
|
||||
mu := NewDRWMutex(ds, "")
|
||||
if slack {
|
||||
b.SetParallelism(10)
|
||||
}
|
||||
@@ -313,7 +312,7 @@ func BenchmarkMutexNoSpin(b *testing.B) {
|
||||
// These goroutines yield during local work, so that switching from
|
||||
// a blocked goroutine to other goroutines is profitable.
|
||||
// As a matter of fact, this benchmark still triggers some spinning in the mutex.
|
||||
m := NewDRWMutex(context.Background(), ds, "")
|
||||
m := NewDRWMutex(ds, "")
|
||||
var acc0, acc1 uint64
|
||||
b.SetParallelism(4)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
@@ -345,7 +344,7 @@ func BenchmarkMutexSpin(b *testing.B) {
|
||||
// profitable. To achieve this we create a goroutine per-proc.
|
||||
// These goroutines access considerable amount of local data so that
|
||||
// unnecessary rescheduling is penalized by cache misses.
|
||||
m := NewDRWMutex(context.Background(), ds, "")
|
||||
m := NewDRWMutex(ds, "")
|
||||
var acc0, acc1 uint64
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var data [16 << 10]uint64
|
||||
|
||||
@@ -32,12 +32,11 @@ type LRWMutex struct {
|
||||
isWriteLock bool
|
||||
ref int
|
||||
m sync.Mutex // Mutex to prevent multiple simultaneous locks
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewLRWMutex - initializes a new lsync RW mutex.
|
||||
func NewLRWMutex(ctx context.Context) *LRWMutex {
|
||||
return &LRWMutex{ctx: ctx}
|
||||
func NewLRWMutex() *LRWMutex {
|
||||
return &LRWMutex{}
|
||||
}
|
||||
|
||||
// Lock holds a write lock on lm.
|
||||
@@ -47,14 +46,14 @@ func NewLRWMutex(ctx context.Context) *LRWMutex {
|
||||
func (lm *LRWMutex) Lock() {
|
||||
|
||||
const isWriteLock = true
|
||||
lm.lockLoop(lm.id, lm.source, time.Duration(math.MaxInt64), isWriteLock)
|
||||
lm.lockLoop(context.Background(), lm.id, lm.source, time.Duration(math.MaxInt64), isWriteLock)
|
||||
}
|
||||
|
||||
// GetLock tries to get a write lock on lm before the timeout occurs.
|
||||
func (lm *LRWMutex) GetLock(id string, source string, timeout time.Duration) (locked bool) {
|
||||
func (lm *LRWMutex) GetLock(ctx context.Context, id string, source string, timeout time.Duration) (locked bool) {
|
||||
|
||||
const isWriteLock = true
|
||||
return lm.lockLoop(id, source, timeout, isWriteLock)
|
||||
return lm.lockLoop(ctx, id, source, timeout, isWriteLock)
|
||||
}
|
||||
|
||||
// RLock holds a read lock on lm.
|
||||
@@ -64,60 +63,55 @@ func (lm *LRWMutex) GetLock(id string, source string, timeout time.Duration) (lo
|
||||
func (lm *LRWMutex) RLock() {
|
||||
|
||||
const isWriteLock = false
|
||||
lm.lockLoop(lm.id, lm.source, time.Duration(1<<63-1), isWriteLock)
|
||||
lm.lockLoop(context.Background(), lm.id, lm.source, time.Duration(1<<63-1), isWriteLock)
|
||||
}
|
||||
|
||||
// GetRLock tries to get a read lock on lm before the timeout occurs.
|
||||
func (lm *LRWMutex) GetRLock(id string, source string, timeout time.Duration) (locked bool) {
|
||||
func (lm *LRWMutex) GetRLock(ctx context.Context, id string, source string, timeout time.Duration) (locked bool) {
|
||||
|
||||
const isWriteLock = false
|
||||
return lm.lockLoop(id, source, timeout, isWriteLock)
|
||||
return lm.lockLoop(ctx, id, source, timeout, isWriteLock)
|
||||
}
|
||||
|
||||
func (lm *LRWMutex) lock(id, source string, isWriteLock bool) (locked bool) {
|
||||
lm.m.Lock()
|
||||
lm.id = id
|
||||
lm.source = source
|
||||
if isWriteLock {
|
||||
if lm.ref == 0 && !lm.isWriteLock {
|
||||
lm.ref = 1
|
||||
lm.isWriteLock = true
|
||||
locked = true
|
||||
}
|
||||
} else {
|
||||
if !lm.isWriteLock {
|
||||
lm.ref++
|
||||
locked = true
|
||||
}
|
||||
}
|
||||
lm.m.Unlock()
|
||||
|
||||
return locked
|
||||
}
|
||||
|
||||
// lockLoop will acquire either a read or a write lock
|
||||
//
|
||||
// The call will block until the lock is granted using a built-in
|
||||
// timing randomized back-off algorithm to try again until successful
|
||||
func (lm *LRWMutex) lockLoop(id, source string, timeout time.Duration, isWriteLock bool) bool {
|
||||
retryCtx, cancel := context.WithTimeout(lm.ctx, timeout)
|
||||
|
||||
func (lm *LRWMutex) lockLoop(ctx context.Context, id, source string, timeout time.Duration, isWriteLock bool) (locked bool) {
|
||||
retryCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// We timed out on the previous lock, incrementally wait
|
||||
// for a longer back-off time and try again afterwards.
|
||||
for range retry.NewTimer(retryCtx) {
|
||||
// Try to acquire the lock.
|
||||
var success bool
|
||||
{
|
||||
lm.m.Lock()
|
||||
|
||||
lm.id = id
|
||||
lm.source = source
|
||||
|
||||
if isWriteLock {
|
||||
if lm.ref == 0 && !lm.isWriteLock {
|
||||
lm.ref = 1
|
||||
lm.isWriteLock = true
|
||||
success = true
|
||||
}
|
||||
} else {
|
||||
if !lm.isWriteLock {
|
||||
lm.ref++
|
||||
success = true
|
||||
}
|
||||
}
|
||||
|
||||
lm.m.Unlock()
|
||||
}
|
||||
|
||||
if success {
|
||||
if lm.lock(id, source, isWriteLock) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// We timed out on the previous lock, incrementally wait
|
||||
// for a longer back-off time and try again afterwards.
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -33,14 +33,15 @@ import (
|
||||
|
||||
func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
|
||||
lrwm := NewLRWMutex(context.Background())
|
||||
ctx := context.Background()
|
||||
lrwm := NewLRWMutex()
|
||||
|
||||
if !lrwm.GetRLock("", "object1", time.Second) {
|
||||
if !lrwm.GetRLock(ctx, "", "object1", time.Second) {
|
||||
panic("Failed to acquire read lock")
|
||||
}
|
||||
// fmt.Println("1st read lock acquired, waiting...")
|
||||
|
||||
if !lrwm.GetRLock("", "object1", time.Second) {
|
||||
if !lrwm.GetRLock(ctx, "", "object1", time.Second) {
|
||||
panic("Failed to acquire read lock")
|
||||
}
|
||||
// fmt.Println("2nd read lock acquired, waiting...")
|
||||
@@ -58,7 +59,7 @@ func testSimpleWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
}()
|
||||
|
||||
// fmt.Println("Trying to acquire write lock, waiting...")
|
||||
locked = lrwm.GetLock("", "", duration)
|
||||
locked = lrwm.GetLock(ctx, "", "", duration)
|
||||
if locked {
|
||||
// fmt.Println("Write lock acquired, waiting...")
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -90,10 +91,11 @@ func TestSimpleWriteLockTimedOut(t *testing.T) {
|
||||
|
||||
func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
|
||||
lrwm := NewLRWMutex(context.Background())
|
||||
ctx := context.Background()
|
||||
lrwm := NewLRWMutex()
|
||||
|
||||
// fmt.Println("Getting initial write lock")
|
||||
if !lrwm.GetLock("", "", time.Second) {
|
||||
if !lrwm.GetLock(ctx, "", "", time.Second) {
|
||||
panic("Failed to acquire initial write lock")
|
||||
}
|
||||
|
||||
@@ -104,7 +106,7 @@ func testDualWriteLock(t *testing.T, duration time.Duration) (locked bool) {
|
||||
}()
|
||||
|
||||
// fmt.Println("Trying to acquire 2nd write lock, waiting...")
|
||||
locked = lrwm.GetLock("", "", duration)
|
||||
locked = lrwm.GetLock(ctx, "", "", duration)
|
||||
if locked {
|
||||
// fmt.Println("2nd write lock acquired, waiting...")
|
||||
time.Sleep(time.Second)
|
||||
@@ -139,8 +141,8 @@ func TestDualWriteLockTimedOut(t *testing.T) {
|
||||
// Test cases below are copied 1 to 1 from sync/rwmutex_test.go (adapted to use LRWMutex)
|
||||
|
||||
// Borrowed from rwmutex_test.go
|
||||
func parallelReader(m *LRWMutex, clocked, cunlock, cdone chan bool) {
|
||||
if m.GetRLock("", "", time.Second) {
|
||||
func parallelReader(ctx context.Context, m *LRWMutex, clocked, cunlock, cdone chan bool) {
|
||||
if m.GetRLock(ctx, "", "", time.Second) {
|
||||
clocked <- true
|
||||
<-cunlock
|
||||
m.RUnlock()
|
||||
@@ -151,13 +153,13 @@ func parallelReader(m *LRWMutex, clocked, cunlock, cdone chan bool) {
|
||||
// Borrowed from rwmutex_test.go
|
||||
func doTestParallelReaders(numReaders, gomaxprocs int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
m := NewLRWMutex(context.Background())
|
||||
m := NewLRWMutex()
|
||||
|
||||
clocked := make(chan bool)
|
||||
cunlock := make(chan bool)
|
||||
cdone := make(chan bool)
|
||||
for i := 0; i < numReaders; i++ {
|
||||
go parallelReader(m, clocked, cunlock, cdone)
|
||||
go parallelReader(context.Background(), m, clocked, cunlock, cdone)
|
||||
}
|
||||
// Wait for all parallel RLock()s to succeed.
|
||||
for i := 0; i < numReaders; i++ {
|
||||
@@ -183,7 +185,7 @@ func TestParallelReaders(t *testing.T) {
|
||||
// Borrowed from rwmutex_test.go
|
||||
func reader(rwm *LRWMutex, numIterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < numIterations; i++ {
|
||||
if rwm.GetRLock("", "", time.Second) {
|
||||
if rwm.GetRLock(context.Background(), "", "", time.Second) {
|
||||
n := atomic.AddInt32(activity, 1)
|
||||
if n < 1 || n >= 10000 {
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
@@ -200,7 +202,7 @@ func reader(rwm *LRWMutex, numIterations int, activity *int32, cdone chan bool)
|
||||
// Borrowed from rwmutex_test.go
|
||||
func writer(rwm *LRWMutex, numIterations int, activity *int32, cdone chan bool) {
|
||||
for i := 0; i < numIterations; i++ {
|
||||
if rwm.GetLock("", "", time.Second) {
|
||||
if rwm.GetLock(context.Background(), "", "", time.Second) {
|
||||
n := atomic.AddInt32(activity, 10000)
|
||||
if n != 10000 {
|
||||
panic(fmt.Sprintf("wlock(%d)\n", n))
|
||||
@@ -219,7 +221,7 @@ func HammerRWMutex(gomaxprocs, numReaders, numIterations int) {
|
||||
runtime.GOMAXPROCS(gomaxprocs)
|
||||
// Number of active readers + 10000 * number of active writers.
|
||||
var activity int32
|
||||
rwm := NewLRWMutex(context.Background())
|
||||
rwm := NewLRWMutex()
|
||||
cdone := make(chan bool)
|
||||
go writer(rwm, numIterations, &activity, cdone)
|
||||
var i int
|
||||
@@ -257,7 +259,7 @@ func TestRWMutex(t *testing.T) {
|
||||
|
||||
// Borrowed from rwmutex_test.go
|
||||
func TestDRLocker(t *testing.T) {
|
||||
wl := NewLRWMutex(context.Background())
|
||||
wl := NewLRWMutex()
|
||||
var rl sync.Locker
|
||||
wlocked := make(chan bool, 1)
|
||||
rlocked := make(chan bool, 1)
|
||||
@@ -298,7 +300,7 @@ func TestUnlockPanic(t *testing.T) {
|
||||
t.Fatalf("unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewLRWMutex(context.Background())
|
||||
mu := NewLRWMutex()
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -309,7 +311,7 @@ func TestUnlockPanic2(t *testing.T) {
|
||||
t.Fatalf("unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewLRWMutex(context.Background())
|
||||
mu := NewLRWMutex()
|
||||
mu.RLock()
|
||||
mu.Unlock()
|
||||
}
|
||||
@@ -321,7 +323,7 @@ func TestRUnlockPanic(t *testing.T) {
|
||||
t.Fatalf("read unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewLRWMutex(context.Background())
|
||||
mu := NewLRWMutex()
|
||||
mu.RUnlock()
|
||||
}
|
||||
|
||||
@@ -332,7 +334,7 @@ func TestRUnlockPanic2(t *testing.T) {
|
||||
t.Fatalf("read unlock of unlocked RWMutex did not panic")
|
||||
}
|
||||
}()
|
||||
mu := NewLRWMutex(context.Background())
|
||||
mu := NewLRWMutex()
|
||||
mu.Lock()
|
||||
mu.RUnlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user