From 1c5af7c31aea44bbd41c09f4b2d14e2f5a32845e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 21 Aug 2023 16:44:50 -0700 Subject: [PATCH] serialize queueMRFHeal(), add timeouts and avoid normal build-ups (#17886) we expect a certain level of IOPs and latency so this is okay. fixes other miscellaneous bugs - such as hanging on mrfCh <- when the context is canceled - queuing MRF heal when the context is canceled - remove unused saveStateCh channel --- cmd/bucket-replication.go | 93 ++++++++++++++++----------------------- cmd/notification.go | 5 +-- 2 files changed, 41 insertions(+), 57 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b160cc2f9..b98b5515e 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -27,9 +27,7 @@ import ( "math/rand" "net/http" "net/url" - "os" "path" - "path/filepath" "reflect" "strings" "sync" @@ -1631,6 +1629,7 @@ type ReplicationPool struct { ctx context.Context priority string mu sync.RWMutex + mrfMU sync.Mutex resyncer *replicationResyncer // workers: @@ -1644,7 +1643,6 @@ type ReplicationPool struct { mrfSaveCh chan MRFReplicateEntry mrfStopCh chan struct{} mrfWorkerSize int - saveStateCh chan struct{} } // ReplicationWorkerOperation is a shared interface of replication operations. @@ -1704,7 +1702,6 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool resyncer: newresyncer(), mrfSaveCh: make(chan MRFReplicateEntry, 100000), mrfStopCh: make(chan struct{}, 1), - saveStateCh: make(chan struct{}, 1), ctx: ctx, objLayer: o, priority: priority, @@ -3035,13 +3032,11 @@ func (p *ReplicationPool) persistMRF() { return } - var mu sync.Mutex entries := make(map[string]MRFReplicateEntry) mTimer := time.NewTimer(mrfSaveInterval) defer mTimer.Stop() + saveMRFToDisk := func(drain bool) { - mu.Lock() - defer mu.Unlock() if len(entries) == 0 { return } @@ -3053,12 +3048,16 @@ func (p *ReplicationPool) persistMRF() { entries[e.versionID] = e } } + // queue all entries for healing before overwriting the node mrf file - p.queueMRFHeal() + if !contextCanceled(p.ctx) { + p.queueMRFHeal() + } if err := p.saveMRFEntries(cctx, entries); err != nil { logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) } + entries = make(map[string]MRFReplicateEntry) } for { @@ -3071,21 +3070,14 @@ func (p *ReplicationPool) persistMRF() { close(p.mrfSaveCh) saveMRFToDisk(true) return - case <-p.saveStateCh: - saveMRFToDisk(true) - return case e, ok := <-p.mrfSaveCh: if !ok { return } - var cnt int - mu.Lock() - entries[e.versionID] = e - cnt = len(entries) - mu.Unlock() - if cnt >= mrfMaxEntries { + if len(entries) >= mrfMaxEntries { saveMRFToDisk(true) } + entries[e.versionID] = e } } } @@ -3119,6 +3111,7 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string if len(entries) == 0 { return nil } + v := MRFReplicateEntries{ Entries: entries, Version: mrfMetaVersionV1, @@ -3134,41 +3127,19 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string return err } - for _, diskPath := range globalEndpoints.LocalDisksPaths() { - // write to first drive - mrfDir := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir) - mrfFileName := filepath.Join(mrfDir, globalLocalNodeNameHex+".bin") - if err := os.MkdirAll(mrfDir, 0o777); err != nil { - return err + for _, localDrive := range globalLocalDrives { + if err := localDrive.WriteAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), buf); err == nil { + break } - file, err := OpenFile(mrfFileName, os.O_CREATE|os.O_WRONLY|writeMode, 0o666) - if err != nil { - continue - } - defer file.Close() - if _, err = file.Write(buf); err != nil { - return err - } - break } return nil } // load mrf entries from disk -func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e error) { +func (p *ReplicationPool) loadMRF(data []byte) (re MRFReplicateEntries, e error) { if !p.initialized() { return re, nil } - file, err := Open(fileName) - if err != nil { - return re, err - } - defer file.Close() - - data, err := io.ReadAll(file) - if err != nil { - return re, err - } if len(data) == 0 { // Seems to be empty. return re, nil @@ -3188,7 +3159,7 @@ func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e er return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) } // OK, parse data. - if _, err = re.UnmarshalMsg(data[4:]); err != nil { + if _, err := re.UnmarshalMsg(data[4:]); err != nil { return re, err } @@ -3233,29 +3204,40 @@ func (p *ReplicationPool) processMRF() { // process sends error logs to the heal channel for an attempt to heal replication. func (p *ReplicationPool) queueMRFHeal() error { + p.mrfMU.Lock() + defer p.mrfMU.Unlock() + if !p.initialized() { return errServerNotInitialized } - for _, diskPath := range globalEndpoints.LocalDisksPaths() { - fileName := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin") - mrfRec, err := p.loadMRF(fileName) + for _, localDrive := range globalLocalDrives { + buf, err := localDrive.ReadAll(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin")) if err != nil { - return err + continue } + + mrfRec, err := p.loadMRF(buf) + if err != nil { + continue + } + // finally delete the file after processing mrf entries - os.Remove(fileName) + localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{}) // queue replication heal in a goroutine to avoid holding up mrf save routine go func(mrfRec MRFReplicateEntries) { for vID, e := range mrfRec.Entries { + ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this. - oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{ + oi, err := p.objLayer.GetObjectInfo(ctx, e.Bucket, e.Object, ObjectOptions{ VersionID: vID, }) + cancel() if err != nil { continue } + QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount) } }(mrfRec) @@ -3351,11 +3333,14 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan ma mrfCh := make(chan madmin.ReplicationMRF, 100) go func() { defer close(mrfCh) - for _, diskPath := range globalEndpoints.LocalDisksPaths() { - file := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin") - mrfRec, err := p.loadMRF(file) + for _, localDrive := range globalLocalDrives { + buf, err := localDrive.ReadAll(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin")) if err != nil { - break + continue + } + mrfRec, err := p.loadMRF(buf) + if err != nil { + continue } for vID, e := range mrfRec.Entries { if e.Bucket != bucket && bucket != "" { diff --git a/cmd/notification.go b/cmd/notification.go index 1df58926b..417813257 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1520,15 +1520,14 @@ func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node select { case <-ctx.Done(): return err - default: - mrfCh <- e + case mrfCh <- e: } } return nil }(mrfCh) go func(wg *sync.WaitGroup) { wg.Wait() - defer close(mrfCh) + close(mrfCh) }(&wg) return mrfCh, nil }