From c56a139fdcafd12b712ec683dfe8d9a25cf565ca Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 26 Apr 2022 20:06:41 -0700 Subject: [PATCH] fix: support decommissioning directory objects (#14822) improvements in this PR include - decommission objects that have __XLDIR__ suffix - decommission objects that have `null` version on a versioned bucket. - make sure to look for any "decom" failures to ensure that we do not wrong conclude decom as complete without all files getting copied over. - break out eagerly upon first error for objects with multiple versions, leave the object as is for support debugging and analysis. --- cmd/erasure-server-pool-decom.go | 72 ++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 92495f182..c2892d853 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -605,6 +605,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool <-parallelWorkers wg.Done() }() + if entry.isDir() { return } @@ -618,15 +619,18 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool // to create the appropriate stack. versionsSorter(fivs.Versions).reverse() + var decommissionedCount int for _, version := range fivs.Versions { // TODO: Skip transitioned objects for now. if version.IsRemote() { + logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bName, version.Name)) continue } // We will skip decommissioning delete markers // with single version, its as good as there // is no data associated with the object. if version.Deleted && len(fivs.Versions) == 1 { + logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bName, version.Name)) continue } if version.Deleted { @@ -639,27 +643,24 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool MTime: version.ModTime, DeleteReplication: version.ReplicationState, }) + var failure bool if err != nil { logger.LogIf(ctx, err) - z.poolMetaMutex.Lock() - z.poolMeta.CountItem(idx, 0, true) - z.poolMetaMutex.Unlock() - } else { - set.DeleteObject(ctx, - bName, - version.Name, - ObjectOptions{ - VersionID: version.VersionID, - }) - z.poolMetaMutex.Lock() - z.poolMeta.CountItem(idx, 0, false) - z.poolMetaMutex.Unlock() + failure = true } + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, 0, failure) + z.poolMetaMutex.Unlock() + if failure { + break // break out on first error + } + decommissionedCount++ continue } + gr, err := set.GetObjectNInfo(ctx, bName, - version.Name, + encodeDirObject(version.Name), nil, http.Header{}, noLock, // all mutations are blocked reads are safe without locks. @@ -671,25 +672,32 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool z.poolMetaMutex.Lock() z.poolMeta.CountItem(idx, version.Size, true) z.poolMetaMutex.Unlock() - continue + break // break out on first error } + var failure bool // gr.Close() is ensured by decommissionObject(). if err = z.decommissionObject(ctx, bName, gr); err != nil { logger.LogIf(ctx, err) - z.poolMetaMutex.Lock() - z.poolMeta.CountItem(idx, version.Size, true) - z.poolMetaMutex.Unlock() - continue + failure = true } + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, version.Size, failure) + z.poolMetaMutex.Unlock() + if failure { + break // break out on first error + } + decommissionedCount++ + } + + // if all versions were decommissioned, then we can delete the object versions. + if decommissionedCount == len(fivs.Versions) { set.DeleteObject(ctx, bName, - version.Name, + entry.name, ObjectOptions{ - VersionID: version.VersionID, - }) - z.poolMetaMutex.Lock() - z.poolMeta.CountItem(idx, version.Size, false) - z.poolMetaMutex.Unlock() + DeletePrefix: true, // use prefix delete to delete all versions at once. + }, + ) } z.poolMetaMutex.Lock() z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name) @@ -779,8 +787,18 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx)) return } - // Complete the decommission.. - logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx)) + + z.poolMetaMutex.Lock() + failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0 + z.poolMetaMutex.Unlock() + + if failed { + // Decommission failed indicate as such. + logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx)) + } else { + // Complete the decommission.. + logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx)) + } } func (z *erasureServerPools) IsSuspended(idx int) bool {