diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index fc9b0d04e..13baa2e3c 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -1129,3 +1129,63 @@ func (a adminAPIHandlers) ReplicationDiffHandler(w http.ResponseWriter, r *http. } } } + +// ReplicationMRFHandler - POST returns info on entries in the MRF backlog for a node or all nodes +func (a adminAPIHandlers) ReplicationMRFHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ReplicationMRF") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ReplicationDiff) + if objectAPI == nil { + return + } + + // Check if bucket exists. + if bucket != "" { + if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + + q := r.Form + node := q.Get("node") + + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + + mrfCh, err := globalNotificationSys.GetReplicationMRF(ctx, bucket, node) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + enc := json.NewEncoder(w) + for { + select { + case entry, ok := <-mrfCh: + if !ok { + return + } + if err := enc.Encode(entry); err != nil { + return + } + if len(mrfCh) == 0 { + // Flush if nothing is queued + w.(http.Flusher).Flush() + } + case <-keepAliveTicker.C: + if len(mrfCh) > 0 { + continue + } + if _, err := w.Write([]byte(" ")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-ctx.Done(): + return + } + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index dda6ee09c..e32d8dd22 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -226,6 +226,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { // ReplicationDiff - MinIO extension API adminRouter.Methods(http.MethodPost).Path(adminVersion+"/replication/diff").HandlerFunc( gz(httpTraceHdrs(adminAPI.ReplicationDiffHandler))).Queries("bucket", "{bucket:.*}") + // ReplicationMRFHandler - MinIO extension API + adminRouter.Methods(http.MethodGet).Path(adminVersion+"/replication/mrf").HandlerFunc( + gz(httpTraceHdrs(adminAPI.ReplicationMRFHandler))).Queries("bucket", "{bucket:.*}") // Batch job operations adminRouter.Methods(http.MethodPost).Path(adminVersion + "/start-job").HandlerFunc( diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index d2e8ec9d0..04f0a6ef1 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -785,9 +785,10 @@ const ( // MRFReplicateEntry mrf entry to save to disk type MRFReplicateEntry struct { - Bucket string `json:"bucket" msg:"b"` - Object string `json:"object" msg:"o"` - versionID string `json:"-"` + Bucket string `json:"bucket" msg:"b"` + Object string `json:"object" msg:"o"` + versionID string `json:"-"` + RetryCount int `json:"retryCount" msg:"rc"` } // MRFReplicateEntries has the map of MRF entries to save to disk @@ -799,9 +800,10 @@ type MRFReplicateEntries struct { // ToMRFEntry returns the relevant info needed by MRF func (ri ReplicateObjectInfo) ToMRFEntry() MRFReplicateEntry { return MRFReplicateEntry{ - Bucket: ri.Bucket, - Object: ri.Name, - versionID: ri.VersionID, + Bucket: ri.Bucket, + Object: ri.Name, + versionID: ri.VersionID, + RetryCount: int(ri.RetryCount), } } diff --git a/cmd/bucket-replication-utils_gen.go b/cmd/bucket-replication-utils_gen.go index d69039c44..4a1078fd6 100644 --- a/cmd/bucket-replication-utils_gen.go +++ b/cmd/bucket-replication-utils_gen.go @@ -327,6 +327,12 @@ func (z *MRFReplicateEntries) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Entries", za0001, "Object") return } + case "rc": + za0002.RetryCount, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "RetryCount") + return + } default: err = dc.Skip() if err != nil { @@ -373,9 +379,9 @@ func (z *MRFReplicateEntries) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Entries") return } - // map header, size 2 + // map header, size 3 // write "b" - err = en.Append(0x82, 0xa1, 0x62) + err = en.Append(0x83, 0xa1, 0x62) if err != nil { return } @@ -394,6 +400,16 @@ func (z *MRFReplicateEntries) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Entries", za0001, "Object") return } + // write "rc" + err = en.Append(0xa2, 0x72, 0x63) + if err != nil { + return + } + err = en.WriteInt(za0002.RetryCount) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "RetryCount") + return + } } // write "v" err = en.Append(0xa1, 0x76) @@ -417,13 +433,16 @@ func (z *MRFReplicateEntries) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendMapHeader(o, uint32(len(z.Entries))) for za0001, za0002 := range z.Entries { o = msgp.AppendString(o, za0001) - // map header, size 2 + // map header, size 3 // string "b" - o = append(o, 0x82, 0xa1, 0x62) + o = append(o, 0x83, 0xa1, 0x62) o = msgp.AppendString(o, za0002.Bucket) // string "o" o = append(o, 0xa1, 0x6f) o = msgp.AppendString(o, za0002.Object) + // string "rc" + o = append(o, 0xa2, 0x72, 0x63) + o = msgp.AppendInt(o, za0002.RetryCount) } // string "v" o = append(o, 0xa1, 0x76) @@ -498,6 +517,12 @@ func (z *MRFReplicateEntries) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Entries", za0001, "Object") return } + case "rc": + za0002.RetryCount, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "RetryCount") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -532,7 +557,7 @@ func (z *MRFReplicateEntries) Msgsize() (s int) { if z.Entries != nil { for za0001, za0002 := range z.Entries { _ = za0002 - s += msgp.StringPrefixSize + len(za0001) + 1 + 2 + msgp.StringPrefixSize + len(za0002.Bucket) + 2 + msgp.StringPrefixSize + len(za0002.Object) + s += msgp.StringPrefixSize + len(za0001) + 1 + 2 + msgp.StringPrefixSize + len(za0002.Bucket) + 2 + msgp.StringPrefixSize + len(za0002.Object) + 3 + msgp.IntSize } } s += 2 + msgp.IntSize @@ -569,6 +594,12 @@ func (z *MRFReplicateEntry) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Object") return } + case "rc": + z.RetryCount, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "RetryCount") + return + } default: err = dc.Skip() if err != nil { @@ -582,9 +613,9 @@ func (z *MRFReplicateEntry) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z MRFReplicateEntry) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 + // map header, size 3 // write "b" - err = en.Append(0x82, 0xa1, 0x62) + err = en.Append(0x83, 0xa1, 0x62) if err != nil { return } @@ -603,19 +634,32 @@ func (z MRFReplicateEntry) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Object") return } + // write "rc" + err = en.Append(0xa2, 0x72, 0x63) + if err != nil { + return + } + err = en.WriteInt(z.RetryCount) + if err != nil { + err = msgp.WrapError(err, "RetryCount") + return + } return } // MarshalMsg implements msgp.Marshaler func (z MRFReplicateEntry) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 3 // string "b" - o = append(o, 0x82, 0xa1, 0x62) + o = append(o, 0x83, 0xa1, 0x62) o = msgp.AppendString(o, z.Bucket) // string "o" o = append(o, 0xa1, 0x6f) o = msgp.AppendString(o, z.Object) + // string "rc" + o = append(o, 0xa2, 0x72, 0x63) + o = msgp.AppendInt(o, z.RetryCount) return } @@ -649,6 +693,12 @@ func (z *MRFReplicateEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Object") return } + case "rc": + z.RetryCount, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "RetryCount") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -663,7 +713,7 @@ func (z *MRFReplicateEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z MRFReplicateEntry) Msgsize() (s int) { - s = 1 + 2 + msgp.StringPrefixSize + len(z.Bucket) + 2 + msgp.StringPrefixSize + len(z.Object) + s = 1 + 2 + msgp.StringPrefixSize + len(z.Bucket) + 2 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.IntSize return } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 36eddf13b..e91d86127 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -27,7 +27,9 @@ import ( "math/rand" "net/http" "net/url" + "os" "path" + "path/filepath" "reflect" "strings" "sync" @@ -1127,7 +1129,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj } if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { - logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN), "replication-target-offline-obj-"+tgt.ARN) + logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s retry:%d", bucket, tgt.ARN, ri.RetryCount), "replication-target-offline"+tgt.ARN) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, @@ -1277,7 +1279,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { - logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", bucket, tgt.ARN), "replication-target-offline-all-"+tgt.ARN) + logger.LogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s retry:%d", bucket, tgt.ARN, ri.RetryCount), "replication-target-offline-heal"+tgt.ARN) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, @@ -2853,7 +2855,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, } // QueueReplicationHeal is a wrapper for queueReplicationHeal -func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) { +func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, retryCount int) { // un-versioned or a prefix if oi.VersionID == "" || oi.ModTime.IsZero() { return @@ -2863,12 +2865,12 @@ func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo) { queueReplicationHeal(ctx, bucket, oi, replicationConfig{ Config: rcfg, remotes: tgts, - }) + }, retryCount) } // queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through // an ongoing resync operation or via existing objects replication configuration setting. -func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig) (roi ReplicateObjectInfo) { +func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig, retryCount int) (roi ReplicateObjectInfo) { // un-versioned or a prefix if oi.VersionID == "" || oi.ModTime.IsZero() { return roi @@ -2878,6 +2880,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf return roi } roi = getHealReplicateObjectInfo(oi, rcfg) + roi.RetryCount = uint32(retryCount) if !roi.Dsc.ReplicateAny() { return } @@ -2939,7 +2942,13 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf return } -const mrfTimeInterval = 5 * time.Minute +const ( + mrfSaveInterval = 5 * time.Minute + mrfQueueInterval = 6 * time.Minute + + mrfRetryLimit = 3 // max number of retries before letting scanner catch up on this object version + mrfMaxEntries = 1000000 +) func (p *ReplicationPool) persistMRF() { if !p.initialized() { @@ -2948,7 +2957,7 @@ func (p *ReplicationPool) persistMRF() { var mu sync.Mutex entries := make(map[string]MRFReplicateEntry) - mTimer := time.NewTimer(mrfTimeInterval) + mTimer := time.NewTimer(mrfSaveInterval) defer mTimer.Stop() saveMRFToDisk := func(drain bool) { mu.Lock() @@ -2964,8 +2973,11 @@ func (p *ReplicationPool) persistMRF() { entries[e.versionID] = e } } + // queue all entries for healing before overwriting the node mrf file + p.queueMRFHeal() + if err := p.saveMRFEntries(cctx, entries); err != nil { - logger.LogOnceIf(p.ctx, fmt.Errorf("Unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) + logger.LogOnceIf(p.ctx, fmt.Errorf("unable to persist replication failures to disk:%w", err), string(replicationSubsystem)) } entries = make(map[string]MRFReplicateEntry) } @@ -2973,7 +2985,7 @@ func (p *ReplicationPool) persistMRF() { select { case <-mTimer.C: saveMRFToDisk(false) - mTimer.Reset(mrfTimeInterval) + mTimer.Reset(mrfSaveInterval) case <-p.ctx.Done(): p.mrfStopCh <- struct{}{} close(p.mrfSaveCh) @@ -2991,7 +3003,7 @@ func (p *ReplicationPool) persistMRF() { entries[e.versionID] = e cnt = len(entries) mu.Unlock() - if cnt >= cap(p.mrfSaveCh) || len(p.mrfSaveCh) >= int(0.8*float32(cap(p.mrfSaveCh))) { + if cnt >= mrfMaxEntries { saveMRFToDisk(true) } } @@ -3002,6 +3014,9 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { if !p.initialized() { return } + if entry.RetryCount > mrfRetryLimit { + return + } select { case <-GlobalContext.Done(): return @@ -3015,7 +3030,7 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { } } -// save mrf entries to mrf_.bin +// save mrf entries to nodenamehex.bin func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) error { if !p.initialized() { return nil @@ -3031,17 +3046,32 @@ func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string data := make([]byte, 4, v.Msgsize()+4) // Initialize the resync meta header. - binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat) - binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion) + binary.LittleEndian.PutUint16(data[0:2], mrfMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], mrfMetaVersion) buf, err := v.MarshalMsg(data) if err != nil { return err } - configFile := path.Join(replicationMRFDir, mustGetUUID()+".bin") - err = saveConfig(ctx, p.objLayer, configFile, buf) - return err + for _, diskPath := range globalEndpoints.LocalDisksPaths() { + // write to first drive + mrfDir := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir) + mrfFileName := filepath.Join(mrfDir, globalLocalNodeNameHex+".bin") + if err := os.MkdirAll(mrfDir, 0o777); err != nil { + return err + } + file, err := OpenFile(mrfFileName, os.O_CREATE|os.O_WRONLY|writeMode, 0o666) + if err != nil { + continue + } + defer file.Close() + if _, err = file.Write(buf); err != nil { + return err + } + break + } + return nil } // load mrf entries from disk @@ -3049,9 +3079,14 @@ func (p *ReplicationPool) loadMRF(fileName string) (re MRFReplicateEntries, e er if !p.initialized() { return re, nil } + file, err := Open(fileName) + if err != nil { + return re, err + } + defer file.Close() - data, err := readConfig(p.ctx, p.objLayer, fileName) - if err != nil && err != errConfigNotFound { + data, err := io.ReadAll(file) + if err != nil { return re, err } if len(data) == 0 { @@ -3089,7 +3124,7 @@ func (p *ReplicationPool) processMRF() { if !p.initialized() { return } - pTimer := time.NewTimer(mrfTimeInterval) + pTimer := time.NewTimer(mrfQueueInterval) defer pTimer.Stop() for { select { @@ -3103,24 +3138,13 @@ func (p *ReplicationPool) processMRF() { } } if len(tgts) == offlineCnt { - pTimer.Reset(mrfTimeInterval) + pTimer.Reset(mrfQueueInterval) continue } - objCh := make(chan ObjectInfo) - cctx, cancelFn := context.WithCancel(p.ctx) - if err := p.objLayer.Walk(cctx, minioMetaBucket, replicationMRFDir, objCh, ObjectOptions{}); err != nil { - pTimer.Reset(mrfTimeInterval) - cancelFn() + if err := p.queueMRFHeal(); err != nil && !osIsNotExist(err) { logger.LogIf(p.ctx, err) - continue } - for item := range objCh { - if err := p.queueMRFHeal(item.Name); err == nil { - p.objLayer.DeleteObject(p.ctx, minioMetaBucket, item.Name, ObjectOptions{}) - } - } - pTimer.Reset(mrfTimeInterval) - cancelFn() + pTimer.Reset(mrfQueueInterval) case <-p.ctx.Done(): return } @@ -3128,24 +3152,36 @@ func (p *ReplicationPool) processMRF() { } // process sends error logs to the heal channel for an attempt to heal replication. -func (p *ReplicationPool) queueMRFHeal(file string) error { +func (p *ReplicationPool) queueMRFHeal() error { if !p.initialized() { return errServerNotInitialized } - mrfRec, err := p.loadMRF(file) - if err != nil { - return err - } - for vID, e := range mrfRec.Entries { - oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{ - VersionID: vID, - }) + for _, diskPath := range globalEndpoints.LocalDisksPaths() { + fileName := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin") + mrfRec, err := p.loadMRF(fileName) if err != nil { - continue + return err } - QueueReplicationHeal(p.ctx, e.Bucket, oi) + // finally delete the file after processing mrf entries + os.Remove(fileName) + + // queue replication heal in a goroutine to avoid holding up mrf save routine + go func(mrfRec MRFReplicateEntries) { + for vID, e := range mrfRec.Entries { + + oi, err := p.objLayer.GetObjectInfo(p.ctx, e.Bucket, e.Object, ObjectOptions{ + VersionID: vID, + }) + if err != nil { + continue + } + QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount) + } + }(mrfRec) + break } + return nil } @@ -3229,3 +3265,36 @@ func (p *ReplicationPool) saveStats(ctx context.Context) error { } return saveConfig(ctx, p.objLayer, getReplicationStatsPath(), data) } + +// getMRF returns MRF entries for this node. +func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan madmin.ReplicationMRF, err error) { + mrfCh := make(chan madmin.ReplicationMRF, 100) + go func() { + defer close(mrfCh) + for _, diskPath := range globalEndpoints.LocalDisksPaths() { + file := filepath.Join(diskPath, minioMetaBucket, replicationMRFDir, globalLocalNodeNameHex+".bin") + mrfRec, err := p.loadMRF(file) + if err != nil { + break + } + for vID, e := range mrfRec.Entries { + if e.Bucket != bucket && bucket != "" { + continue + } + select { + case mrfCh <- madmin.ReplicationMRF{ + NodeName: globalLocalNodeName, + Object: e.Object, + VersionID: vID, + Bucket: e.Bucket, + RetryCount: e.RetryCount, + }: + case <-ctx.Done(): + return + } + } + } + }() + + return mrfCh, nil +} diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index e39152e63..a87f404f4 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1260,7 +1260,7 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj if i.replication.Config == nil { return } - roi := queueReplicationHeal(ctx, oi.Bucket, oi, i.replication) + roi := queueReplicationHeal(ctx, oi.Bucket, oi, i.replication, 0) if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { return } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 014ca526b..e6a7c7f23 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -402,7 +402,7 @@ func applyBucketActions(ctx context.Context, o listPathOptions, in <-chan metaCa } } - queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication) + queueReplicationHeal(ctx, o.Bucket, objInfo, o.Replication, 0) } } } diff --git a/cmd/notification.go b/cmd/notification.go index 16f658599..243fdb772 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1387,3 +1387,83 @@ func (sys *NotificationSys) GetLastDayTierStats(ctx context.Context) DailyAllTie } return merged } + +// GetReplicationMRF - Get replication MRF from all peers. +func (sys *NotificationSys) GetReplicationMRF(ctx context.Context, bucket, node string) (mrfCh chan madmin.ReplicationMRF, err error) { + g := errgroup.WithNErrs(len(sys.peerClients)) + peerChannels := make([]<-chan madmin.ReplicationMRF, len(sys.peerClients)) + for index, client := range sys.peerClients { + if client == nil { + continue + } + host := client.host.String() + if host != node && node != "all" { + continue + } + index := index + g.Go(func() error { + var err error + peerChannels[index], err = sys.peerClients[index].GetReplicationMRF(ctx, bucket) + return err + }, index) + } + mrfCh = make(chan madmin.ReplicationMRF, 4000) + var wg sync.WaitGroup + + for index, err := range g.Wait() { + if err != nil { + if sys.peerClients[index] != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", + sys.peerClients[index].host.String()) + logger.LogOnceIf(logger.SetReqInfo(ctx, reqInfo), err, sys.peerClients[index].host.String()) + } else { + logger.LogOnceIf(ctx, err, "peer-offline") + } + continue + } + wg.Add(1) + go func(ctx context.Context, peerChannel <-chan madmin.ReplicationMRF, wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case m, ok := <-peerChannel: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case mrfCh <- m: + } + case <-ctx.Done(): + return + } + } + }(ctx, peerChannels[index], &wg) + } + wg.Add(1) + go func(ch chan madmin.ReplicationMRF) error { + defer wg.Done() + if node != "all" && node != globalLocalNodeName { + return nil + } + mCh, err := globalReplicationPool.getMRF(ctx, bucket) + if err != nil { + return err + } + for e := range mCh { + select { + case <-ctx.Done(): + return err + default: + mrfCh <- e + } + } + return nil + }(mrfCh) + go func(wg *sync.WaitGroup) { + wg.Wait() + defer close(mrfCh) + }(&wg) + return mrfCh, nil +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 195a7a790..76234d89c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -454,7 +454,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID} w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)} } - QueueReplicationHeal(ctx, bucket, gr.ObjInfo) + QueueReplicationHeal(ctx, bucket, gr.ObjInfo, 0) } writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -489,7 +489,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj } } - QueueReplicationHeal(ctx, bucket, gr.ObjInfo) + QueueReplicationHeal(ctx, bucket, gr.ObjInfo, 0) } // filter object lock metadata if permission does not permit @@ -715,7 +715,7 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID} w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)} } - QueueReplicationHeal(ctx, bucket, objInfo) + QueueReplicationHeal(ctx, bucket, objInfo, 0) // do an additional verification whether object exists when object is deletemarker and request // is from replication if opts.CheckDMReplicationReady { @@ -746,7 +746,7 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob } } } - QueueReplicationHeal(ctx, bucket, objInfo) + QueueReplicationHeal(ctx, bucket, objInfo, 0) } // filter object lock metadata if permission does not permit diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index f81acbfcd..6d8ed4d3e 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -944,3 +944,34 @@ func (client *peerRESTClient) Netperf(ctx context.Context, duration time.Duratio err = gob.NewDecoder(respBody).Decode(&result) return result, err } + +// GetReplicationMRF - get replication MRF for bucket +func (client *peerRESTClient) GetReplicationMRF(ctx context.Context, bucket string) (chan madmin.ReplicationMRF, error) { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + + respBody, err := client.callWithContext(ctx, peerRESTMethodGetReplicationMRF, values, nil, -1) + if err != nil { + return nil, err + } + dec := gob.NewDecoder(respBody) + ch := make(chan madmin.ReplicationMRF) + go func(ch chan madmin.ReplicationMRF) { + defer func() { + xhttp.DrainBody(respBody) + close(ch) + }() + for { + var entry madmin.ReplicationMRF + if err := dec.Decode(&entry); err != nil { + return + } + select { + case <-ctx.Done(): + return + case ch <- entry: + } + } + }(ch) + return ch, nil +} diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 611c1b9cf..5e4d36caa 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - peerRESTVersion = "v30" // Removed bloom filter + peerRESTVersion = "v31" // Add replication MRF peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" @@ -76,6 +76,7 @@ const ( peerRESTMethodDevNull = "/devnull" peerRESTMethodNetperf = "/netperf" peerRESTMethodMetrics = "/metrics" + peerRESTMethodGetReplicationMRF = "/getreplicationmrf" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index ba8110fbc..3a2dd8b7b 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1313,6 +1313,31 @@ func (s *peerRESTServer) DriveSpeedTestHandler(w http.ResponseWriter, r *http.Re logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result)) } +// GetReplicationMRFHandler - returns replication MRF for bucket +func (s *peerRESTServer) GetReplicationMRFHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + ctx := newContext(r, w, "GetReplicationMRF") + re, err := globalReplicationPool.getMRF(ctx, bucketName) + if err != nil { + s.writeErrorResponse(w, err) + return + } + enc := gob.NewEncoder(w) + + for m := range re { + if err := enc.Encode(m); err != nil { + s.writeErrorResponse(w, errors.New("Encoding mrf failed: "+err.Error())) + return + } + } +} + // DevNull - everything goes to io.Discard func (s *peerRESTServer) DevNull(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1393,6 +1418,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadUser).HandlerFunc(h(server.LoadUserHandler)).Queries(restQueries(peerRESTUser, peerRESTUserTemp)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadServiceAccount).HandlerFunc(h(server.LoadServiceAccountHandler)).Queries(restQueries(peerRESTUser)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(h(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetReplicationMRF).HandlerFunc(httpTraceHdrs(server.GetReplicationMRFHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(h(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadProfilingData).HandlerFunc(h(server.DownloadProfilingDataHandler))