From 4a6af93c8398bcb8bd378dc2b923a4712ba850a4 Mon Sep 17 00:00:00 2001 From: Poorna Date: Thu, 24 Aug 2023 09:24:26 -0700 Subject: [PATCH] mark replication target offline if network timeouts seen (#17907) regular target liveness check every 5 secs will toggle state back as target returns online. --- cmd/bucket-replication.go | 20 ++++++++++++++++++-- cmd/bucket-targets.go | 10 ++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b98b5515e..e9d015ede 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -632,6 +632,9 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI return } default: + if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + globalBucketTargetSys.markOffline(tgt.EndpointURL()) + } // mark delete marker replication as failed if target cluster not ready to receive // this request yet (object version not replicated yet) if err != nil && !toi.ReplicationReady { @@ -656,6 +659,9 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI rinfo.VersionPurgeStatus = Failed } logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", tgt.Bucket, dobj.ObjectName, versionID, rmErr)) + if rmErr != nil && minio.IsNetworkOrHostDown(rmErr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + globalBucketTargetSys.markOffline(tgt.EndpointURL()) + } } else { if dobj.VersionID == "" { rinfo.ReplicationStatus = replication.Completed @@ -1217,7 +1223,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj } r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts) if objInfo.isMultipart() { - if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, + if err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts); err != nil { if minio.ToErrorResponse(err).Code != "PreconditionFailed" { rinfo.ReplicationStatus = replication.Failed @@ -1232,6 +1238,9 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj } } } + if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + globalBucketTargetSys.markOffline(tgt.EndpointURL()) + } return } @@ -1375,6 +1384,10 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } // if target returns error other than NoSuchKey, defer replication attempt if cerr != nil { + if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + globalBucketTargetSys.markOffline(tgt.EndpointURL()) + } + errResp := minio.ToErrorResponse(cerr) switch errResp.Code { case "NoSuchKey", "NoSuchVersion", "SlowDownRead": @@ -1446,7 +1459,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts) if objInfo.isMultipart() { - if err := replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, + if err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts); err != nil { if minio.ToErrorResponse(err).Code != "PreconditionFailed" { rinfo.ReplicationStatus = replication.Failed @@ -1465,6 +1478,9 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } } } + if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { + globalBucketTargetSys.markOffline(tgt.EndpointURL()) + } } return } diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 26a862267..a4fd9833d 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -69,6 +69,16 @@ func (sys *BucketTargetSys) isOffline(ep *url.URL) bool { return false } +// markOffline sets endpoint to offline if network i/o timeout seen. +func (sys *BucketTargetSys) markOffline(ep *url.URL) { + sys.hMutex.Lock() + defer sys.hMutex.Unlock() + if h, ok := sys.hc[ep.Host]; ok { + h.Online = false + sys.hc[ep.Host] = h + } +} + func (sys *BucketTargetSys) initHC(ep *url.URL) { sys.hMutex.Lock() sys.hc[ep.Host] = epHealth{