diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index d3453d405..d7d865536 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -149,10 +149,9 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { b.h.Write(buf) if !bytes.Equal(b.h.Sum(nil), b.hashBytes) { - err := &errHashMismatch{fmt.Sprintf("Disk: %s -> %s/%s - content hash does not match - expected %s, got %s", - b.disk, b.volume, b.filePath, hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))} - logger.LogIf(GlobalContext, err) - return 0, err + logger.LogIf(GlobalContext, fmt.Errorf("Disk: %s -> %s/%s - content hash does not match - expected %s, got %s", + b.disk, b.volume, b.filePath, hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil)))) + return 0, errFileCorrupt } b.currOffset += int64(len(buf)) return len(buf), nil diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 2d3965dfd..5200d2441 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -26,8 +26,6 @@ import ( "github.com/minio/minio/cmd/logger" ) -var errHealRequired = errors.New("heal required") - // Reads in parallel from readers. type parallelReader struct { readers []io.ReaderAt @@ -125,7 +123,8 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { readTriggerCh <- true } - healRequired := int32(0) // Atomic bool flag. + bitrotHeal := int32(0) // Atomic bool flag. + missingPartsHeal := int32(0) // Atomic bool flag. readerIndex := 0 var wg sync.WaitGroup // if readTrigger is true, it implies next disk.ReadAt() should be tried @@ -164,8 +163,10 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize] _, err := rr.ReadAt(p.buf[bufIdx], p.offset) if err != nil { - if _, ok := err.(*errHashMismatch); ok { - atomic.StoreInt32(&healRequired, 1) + if errors.Is(err, errFileNotFound) { + atomic.StoreInt32(&missingPartsHeal, 1) + } else if errors.Is(err, errFileCorrupt) { + atomic.StoreInt32(&bitrotHeal, 1) } // This will be communicated upstream. @@ -188,8 +189,10 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { wg.Wait() if p.canDecode(newBuf) { p.offset += p.shardSize - if healRequired != 0 { - return newBuf, errHealRequired + if atomic.LoadInt32(&missingPartsHeal) == 1 { + return newBuf, errFileNotFound + } else if atomic.LoadInt32(&bitrotHeal) == 1 { + return newBuf, errFileCorrupt } return newBuf, nil } @@ -197,41 +200,20 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { return nil, reduceReadQuorumErrs(context.Background(), p.errs, objectOpIgnoredErrs, p.dataBlocks) } -type errDecodeHealRequired struct { - err error -} - -func (err *errDecodeHealRequired) Error() string { - return err.err.Error() -} - -func (err *errDecodeHealRequired) Unwrap() error { - return err.err -} - // Decode reads from readers, reconstructs data if needed and writes the data to the writer. // A set of preferred drives can be supplied. In that case they will be used and the data reconstructed. -func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64, prefer []bool) error { - healRequired, err := e.decode(ctx, writer, readers, offset, length, totalLength, prefer) - if healRequired { - return &errDecodeHealRequired{err} - } - - return err -} - -// Decode reads from readers, reconstructs data if needed and writes the data to the writer. -func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64, prefer []bool) (bool, error) { +func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64, prefer []bool) (written int64, derr error) { if offset < 0 || length < 0 { logger.LogIf(ctx, errInvalidArgument) - return false, errInvalidArgument + return -1, errInvalidArgument } if offset+length > totalLength { logger.LogIf(ctx, errInvalidArgument) - return false, errInvalidArgument + return -1, errInvalidArgument } + if length == 0 { - return false, nil + return 0, nil } reader := newParallelReader(readers, e, offset, totalLength) @@ -242,7 +224,6 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read startBlock := offset / e.blockSize endBlock := (offset + length) / e.blockSize - var healRequired bool var bytesWritten int64 var bufs [][]byte for block := startBlock; block <= endBlock; block++ { @@ -264,32 +245,39 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read if blockLength == 0 { break } + var err error bufs, err = reader.Read(bufs) - if err != nil { - if errors.Is(err, errHealRequired) { - // errHealRequired is only returned if there are be enough data for reconstruction. - healRequired = true - } else { - return healRequired, err + if len(bufs) > 0 { + // Set only if there are be enough data for reconstruction. + // and only for expected errors, also set once. + if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) { + if derr == nil { + derr = err + } } + } else if err != nil { + // For all errors that cannot be reconstructed fail the read operation. + return -1, err } if err = e.DecodeDataBlocks(bufs); err != nil { logger.LogIf(ctx, err) - return healRequired, err + return -1, err } n, err := writeDataBlocks(ctx, writer, bufs, e.dataBlocks, blockOffset, blockLength) if err != nil { - return healRequired, err + return -1, err } + bytesWritten += n } + if bytesWritten != length { logger.LogIf(ctx, errLessData) - return healRequired, errLessData + return bytesWritten, errLessData } - return healRequired, nil + return bytesWritten, derr } diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index d622a8b3d..46e0dcb2b 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -138,7 +138,7 @@ func TestErasureDecode(t *testing.T) { } writer := bytes.NewBuffer(nil) - err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data, nil) + _, err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data, nil) closeBitrotReaders(bitrotReaders) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but failed with: %v", i, err) @@ -181,7 +181,7 @@ func TestErasureDecode(t *testing.T) { bitrotReaders[0] = nil } writer.Reset() - err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data, nil) + _, err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data, nil) closeBitrotReaders(bitrotReaders) if err != nil && !test.shouldFailQuorum { t.Errorf("Test %d: should pass but failed with: %v", i, err) @@ -272,7 +272,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { tillOffset := erasure.ShardFileOffset(offset, readLen, length) bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } - err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) + _, err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) closeBitrotReaders(bitrotReaders) if err != nil { t.Fatal(err, offset, readLen) @@ -334,7 +334,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, tillOffset := erasure.ShardFileOffset(0, size, size) bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } - if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { + if _, err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { panic(err) } closeBitrotReaders(bitrotReaders) diff --git a/cmd/erasure-lowlevel-heal.go b/cmd/erasure-lowlevel-heal.go index 66b031d92..eedc05474 100644 --- a/cmd/erasure-lowlevel-heal.go +++ b/cmd/erasure-lowlevel-heal.go @@ -28,7 +28,7 @@ import ( func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64) error { r, w := io.Pipe() go func() { - if err := e.Decode(ctx, w, readers, 0, size, size, nil); err != nil { + if _, err := e.Decode(ctx, w, readers, 0, size, size, nil); err != nil { w.CloseWithError(err) return } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 2e6a86d17..86ef74dd7 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bucket/lifecycle" "github.com/minio/minio/pkg/bucket/replication" + "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -315,17 +316,29 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // Prefer local disks prefer[index] = disk.Hostname() == "" } - err = erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer) + + written, err := erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer) // Note: we should not be defer'ing the following closeBitrotReaders() call as // we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time // we return from this function. closeBitrotReaders(readers) if err != nil { - if decodeHealErr, ok := err.(*errDecodeHealRequired); ok { - healOnce.Do(func() { - go deepHealObject(bucket, object, fi.VersionID) - }) - err = decodeHealErr.err + // If we have successfully written all the content that was asked + // by the client, but we still see an error - this would mean + // that we have some parts or data blocks missing or corrupted + // - attempt a heal to successfully heal them for future calls. + if written == partLength { + var scan madmin.HealScanMode + if errors.Is(err, errFileNotFound) { + scan = madmin.HealNormalScan + } else if errors.Is(err, errFileCorrupt) { + scan = madmin.HealDeepScan + } + if scan != madmin.HealUnknownScan { + healOnce.Do(func() { + go healObject(bucket, object, fi.VersionID, scan) + }) + } } if err != nil { return toObjectErr(err, bucket, object) @@ -416,6 +429,24 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s if err != nil { return fi, nil, nil, err } + + var missingBlocks int + for i, err := range errs { + if err != nil && errors.Is(err, errFileNotFound) { + missingBlocks++ + continue + } + if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) { + continue + } + missingBlocks++ + } + + // if missing metadata can be reconstructed, attempt to reconstruct. + if missingBlocks > 0 && missingBlocks < readQuorum { + go healObject(bucket, object, fi.VersionID, madmin.HealNormalScan) + } + return fi, metaArr, onlineDisks, nil } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 53d97312f..bb62404b0 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -200,8 +200,8 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis return nil } -// deepHealObject heals given object path in deep to fix bitrot. -func deepHealObject(bucket, object, versionID string) { +// healObject heals given object path in deep to fix bitrot. +func healObject(bucket, object, versionID string, scan madmin.HealScanMode) { // Get background heal sequence to send elements to heal bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) if ok { @@ -211,7 +211,7 @@ func deepHealObject(bucket, object, versionID string) { versionID: versionID, opts: &madmin.HealOpts{ Remove: true, // if found dangling purge it. - ScanMode: madmin.HealDeepScan, + ScanMode: scan, }, } } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 4de76d3d6..ab653d0ca 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v24" // Add more small file optimization + storageRESTVersion = "v25" // Add more small file optimization storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index 80cd2c74d..b49783f70 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -31,8 +31,12 @@ import ( type HealScanMode int const ( + // HealUnknownScan default is unknown + HealUnknownScan HealScanMode = iota + // HealNormalScan checks if parts are present and not outdated - HealNormalScan HealScanMode = iota + HealNormalScan + // HealDeepScan checks for parts bitrot checksums HealDeepScan )