diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 616ba2770..e7c61c961 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -710,6 +710,9 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem select { case globalBackgroundHealRoutine.tasks <- task: + if serverDebugLog { + logger.Info("Task in the queue: %#v", task) + } case <-h.ctx.Done(): return nil } @@ -885,6 +888,7 @@ func (h *healSequence) healObject(bucket, object, versionID string) error { bucket: bucket, object: object, versionID: versionID, + opts: &h.settings, }, madmin.HealItemObject) // Wait and proceed if there are active requests diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 0454d9339..7e800d252 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -954,13 +954,9 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version // This allows to quickly check if all is ok or all are missing. _, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) if isAllNotFound(errs) { - err = toObjectErr(errFileNotFound, bucket, object) - if versionID != "" { - err = toObjectErr(errFileVersionNotFound, bucket, object, versionID) - } // Nothing to do, file is already gone. return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, - errs, bucket, object, versionID), err + errs, bucket, object, versionID), nil } // Heal the object. diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index e83db0be4..943d823b3 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -22,6 +22,7 @@ import ( "context" "crypto/rand" "crypto/sha256" + "errors" "io" "os" "path" @@ -97,12 +98,12 @@ func TestHealing(t *testing.T) { t.Fatal("HealObject failed") } - err = os.RemoveAll(path.Join(fsDirs[0], bucket, object, "er.meta")) + err = os.RemoveAll(path.Join(fsDirs[0], bucket, object, "xl.meta")) if err != nil { t.Fatal(err) } - // Write er.meta with different modtime to simulate the case where a disk had + // Write xl.meta with different modtime to simulate the case where a disk had // gone down when an object was replaced by a new object. fileInfoOutDated := fileInfoPreHeal fileInfoOutDated.ModTime = time.Now() @@ -328,6 +329,173 @@ func TestHealingDanglingObject(t *testing.T) { } } +func TestHealObjectCorruptedPools(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resetGlobalHealState() + defer resetGlobalHealState() + + nDisks := 32 + fsDirs, err := getRandomDisks(nDisks) + if err != nil { + t.Fatal(err) + } + + defer removeRoots(fsDirs) + + pools := mustGetPoolEndpoints(fsDirs[:16]...) + pools = append(pools, mustGetPoolEndpoints(fsDirs[16:]...)...) + + // Everything is fine, should return nil + objLayer, _, err := initObjectLayer(ctx, pools) + if err != nil { + t.Fatal(err) + } + + bucket := getRandomBucketName() + object := getRandomObjectName() + data := bytes.Repeat([]byte("a"), 5*1024*1024) + var opts ObjectOptions + + err = objLayer.MakeBucketWithLocation(ctx, bucket, BucketOptions{}) + if err != nil { + t.Fatalf("Failed to make a bucket - %v", err) + } + + // Create an object with multiple parts uploaded in decreasing + // part number. + uploadID, err := objLayer.NewMultipartUpload(ctx, bucket, object, opts) + if err != nil { + t.Fatalf("Failed to create a multipart upload - %v", err) + } + + var uploadedParts []CompletePart + for _, partID := range []int{2, 1} { + pInfo, err1 := objLayer.PutObjectPart(ctx, bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts) + if err1 != nil { + t.Fatalf("Failed to upload a part - %v", err1) + } + uploadedParts = append(uploadedParts, CompletePart{ + PartNumber: pInfo.PartNumber, + ETag: pInfo.ETag, + }) + } + + _, err = objLayer.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{}) + if err != nil { + t.Fatalf("Failed to complete multipart upload - %v", err) + } + + // Test 1: Remove the object backend files from the first disk. + z := objLayer.(*erasureServerPools) + for _, set := range z.serverPools { + er := set.sets[0] + erasureDisks := er.getDisks() + firstDisk := erasureDisks[0] + err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) + if err != nil { + t.Fatalf("Failed to delete a file - %v", err) + } + + _, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan}) + if err != nil { + t.Fatalf("Failed to heal object - %v", err) + } + + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fi, err := getLatestFileInfo(ctx, fileInfos, errs) + if errors.Is(err, errFileNotFound) { + continue + } + if err != nil { + t.Fatalf("Failed to getLatestFileInfo - %v", err) + } + + if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile, false); err != nil { + t.Errorf("Expected xl.meta file to be present but stat failed - %v", err) + } + + err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) + if err != nil { + t.Errorf("Failure during deleting part.1 - %v", err) + } + + err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), []byte{}) + if err != nil { + t.Errorf("Failure during creating part.1 - %v", err) + } + + _, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan}) + if err != nil { + t.Errorf("Expected nil but received %v", err) + } + + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + nfi, err := getLatestFileInfo(ctx, fileInfos, errs) + if err != nil { + t.Fatalf("Failed to getLatestFileInfo - %v", err) + } + + if !reflect.DeepEqual(fi, nfi) { + t.Fatalf("FileInfo not equal after healing") + } + + err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) + if err != nil { + t.Errorf("Failure during deleting part.1 - %v", err) + } + + bdata := bytes.Repeat([]byte("b"), int(nfi.Size)) + err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), bdata) + if err != nil { + t.Errorf("Failure during creating part.1 - %v", err) + } + + _, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan}) + if err != nil { + t.Errorf("Expected nil but received %v", err) + } + + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + nfi, err = getLatestFileInfo(ctx, fileInfos, errs) + if err != nil { + t.Fatalf("Failed to getLatestFileInfo - %v", err) + } + + if !reflect.DeepEqual(fi, nfi) { + t.Fatalf("FileInfo not equal after healing") + } + + // Test 4: checks if HealObject returns an error when xl.meta is not found + // in more than read quorum number of disks, to create a corrupted situation. + for i := 0; i <= nfi.Erasure.DataBlocks; i++ { + erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) + } + + // Try healing now, expect to receive errFileNotFound. + _, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan}) + if err != nil { + if _, ok := err.(ObjectNotFound); !ok { + t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err) + } + } + + // since majority of xl.meta's are not available, object should be successfully deleted. + _, err = objLayer.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) + if _, ok := err.(ObjectNotFound); !ok { + t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err) + } + + for i := 0; i < (nfi.Erasure.DataBlocks + nfi.Erasure.ParityBlocks); i++ { + _, err = erasureDisks[i].StatInfoFile(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) + if err == nil { + t.Errorf("Expected xl.meta file to be not present, but succeeeded") + } + } + } +} + func TestHealObjectCorrupted(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -405,7 +573,7 @@ func TestHealObjectCorrupted(t *testing.T) { } if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile, false); err != nil { - t.Errorf("Expected er.meta file to be present but stat failed - %v", err) + t.Errorf("Expected xl.meta file to be present but stat failed - %v", err) } err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) @@ -549,7 +717,7 @@ func TestHealObjectErasure(t *testing.T) { } if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile, false); err != nil { - t.Errorf("Expected er.meta file to be present but stat failed - %v", err) + t.Errorf("Expected xl.meta file to be present but stat failed - %v", err) } erasureDisks := er.getDisks() @@ -565,7 +733,7 @@ func TestHealObjectErasure(t *testing.T) { // Try healing now, expect to receive errDiskNotFound. _, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan}) - // since majority of er.meta's are not available, object quorum can't be read properly and error will be errErasureReadQuorum + // since majority of xl.meta's are not available, object quorum can't be read properly and error will be errErasureReadQuorum if _, ok := err.(InsufficientReadQuorum); !ok { t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 610dab8e4..e07ef194e 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1684,112 +1684,120 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re // HealObjectFn closure function heals the object. type HealObjectFn func(bucket, object, versionID string) error -func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error { - errCh := make(chan error) +func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(metaCacheEntry) error, errCh chan<- error) { ctx, cancel := context.WithCancel(ctx) + defer cancel() + + disks, _ := set.getOnlineDisksWithHealing() + if len(disks) == 0 { + errCh <- errors.New("listAndHeal: No non-healing disks found") + return + } + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: 1, + objQuorum: 1, + bucket: bucket, + strict: false, // Allow less strict matching. + } + + path := baseDirFromPrefix(prefix) + if path == "" { + path = prefix + } + + lopts := listPathRawOptions{ + disks: disks, + bucket: bucket, + path: path, + recursive: true, + forwardTo: "", + minDisks: 1, + reportNotFound: false, + agreed: func(entry metaCacheEntry) { + if err := healEntry(entry); err != nil { + errCh <- err + return + } + }, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + entry, ok := entries.resolve(&resolver) + if !ok { + // check if we can get one entry atleast + // proceed to heal nonetheless. + entry, _ = entries.firstFound() + } + + if err := healEntry(*entry); err != nil { + errCh <- err + return + } + }, + finished: nil, + } + + if err := listPathRaw(ctx, lopts); err != nil { + errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts) + return + } +} + +func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error { + errCh := make(chan error) + healEntry := func(entry metaCacheEntry) error { + if entry.isDir() { + return nil + } + // We might land at .metacache, .trash, .multipart + // no need to heal them skip, only when bucket + // is '.minio.sys' + if bucket == minioMetaBucket { + if wildcard.Match("buckets/*/.metacache/*", entry.name) { + return nil + } + if wildcard.Match("tmp/*", entry.name) { + return nil + } + if wildcard.Match("multipart/*", entry.name) { + return nil + } + if wildcard.Match("tmp-old/*", entry.name) { + return nil + } + } + fivs, err := entry.fileInfoVersions(bucket) + if err != nil { + return healObjectFn(bucket, entry.name, "") + } + + for _, version := range fivs.Versions { + if err := healObjectFn(bucket, version.Name, version.VersionID); err != nil { + return err + } + } + + return nil + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { defer close(errCh) - defer cancel() + var wg sync.WaitGroup for _, erasureSet := range z.serverPools { - var wg sync.WaitGroup for _, set := range erasureSet.sets { - set := set wg.Add(1) - go func() { + go func(set *erasureObjects) { defer wg.Done() - disks, _ := set.getOnlineDisksWithHealing() - if len(disks) == 0 { - cancel() - errCh <- errors.New("HealObjects: No non-healing disks found") - return - } - - healEntry := func(entry metaCacheEntry) { - if entry.isDir() { - return - } - // We might land at .metacache, .trash, .multipart - // no need to heal them skip, only when bucket - // is '.minio.sys' - if bucket == minioMetaBucket { - if wildcard.Match("buckets/*/.metacache/*", entry.name) { - return - } - if wildcard.Match("tmp/*", entry.name) { - return - } - if wildcard.Match("multipart/*", entry.name) { - return - } - if wildcard.Match("tmp-old/*", entry.name) { - return - } - } - fivs, err := entry.fileInfoVersions(bucket) - if err != nil { - if err := healObject(bucket, entry.name, ""); err != nil { - cancel() - errCh <- err - return - } - return - } - - for _, version := range fivs.Versions { - if err := healObject(bucket, version.Name, version.VersionID); err != nil { - cancel() - errCh <- err - return - } - } - } - - // How to resolve partial results. - resolver := metadataResolutionParams{ - dirQuorum: 1, - objQuorum: 1, - bucket: bucket, - strict: false, // Allow less strict matching. - } - - path := baseDirFromPrefix(prefix) - if path == "" { - path = prefix - } - - lopts := listPathRawOptions{ - disks: disks, - bucket: bucket, - path: path, - recursive: true, - forwardTo: "", - minDisks: 1, - reportNotFound: false, - agreed: healEntry, - partial: func(entries metaCacheEntries, nAgreed int, errs []error) { - entry, ok := entries.resolve(&resolver) - if !ok { - // check if we can get one entry atleast - // proceed to heal nonetheless. - entry, _ = entries.firstFound() - } - - healEntry(*entry) - }, - finished: nil, - } - - if err := listPathRaw(ctx, lopts); err != nil { - cancel() - errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts) - return - } - }() + listAndHeal(ctx, bucket, prefix, set, healEntry, errCh) + }(set) } - wg.Wait() } + wg.Wait() }() return <-errCh } @@ -1797,14 +1805,33 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { object = encodeDirObject(object) - for _, pool := range z.serverPools { - result, err := pool.HealObject(ctx, bucket, object, versionID, opts) - result.Object = decodeDirObject(result.Object) - if err != nil { - return result, err - } - return result, nil + errs := make([]error, len(z.serverPools)) + results := make([]madmin.HealResultItem, len(z.serverPools)) + var wg sync.WaitGroup + for idx, pool := range z.serverPools { + wg.Add(1) + go func(idx int, pool *erasureSets) { + defer wg.Done() + result, err := pool.HealObject(ctx, bucket, object, versionID, opts) + result.Object = decodeDirObject(result.Object) + errs[idx] = err + results[idx] = result + }(idx, pool) } + wg.Wait() + + for _, err := range errs { + if err != nil { + return madmin.HealResultItem{}, err + } + } + + for _, result := range results { + if result.Object != "" { + return result, nil + } + } + if versionID != "" { return madmin.HealResultItem{}, VersionNotFound{ Bucket: bucket,