From 4c9fae90ff77f5092597c1413b8527375c650ff4 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Fri, 29 Sep 2017 04:27:19 +0530 Subject: [PATCH] Optimize healObject by eliminating extra data passes (#4949) --- cmd/erasure-healfile.go | 66 ++++++++++++------- cmd/erasure-healfile_test.go | 6 +- cmd/xl-v1-healing-common.go | 68 +++++++++----------- cmd/xl-v1-healing.go | 121 +++++++++++++++++++---------------- 4 files changed, 144 insertions(+), 117 deletions(-) diff --git a/cmd/erasure-healfile.go b/cmd/erasure-healfile.go index 9de9a00f4..48e816cb9 100644 --- a/cmd/erasure-healfile.go +++ b/cmd/erasure-healfile.go @@ -17,13 +17,16 @@ package cmd import ( + "fmt" "hash" + "strings" ) // HealFile tries to reconstruct an erasure-coded file spread over all // available disks. HealFile will read the valid parts of the file, // reconstruct the missing data and write the reconstructed parts back -// to `staleDisks`. +// to `staleDisks` at the destination `dstVol/dstPath/`. Parts are +// verified against the given BitrotAlgorithm and checksums. // // `staleDisks` is a slice of disks where each non-nil entry has stale // or no data, and so will be healed. @@ -34,19 +37,17 @@ import ( // In addition, `staleDisks` and `s.disks` must have the same ordering // of disks w.r.t. erasure coding of the object. // -// The function will try to read the valid parts from the file under -// the given volume and path and tries to reconstruct the file under -// the given healVolume and healPath (on staleDisks). The given -// algorithm will be used to verify the valid parts and to protect the -// reconstructed file. +// Errors when writing to `staleDisks` are not propagated as long as +// writes succeed for at least one disk. This allows partial healing +// despite stale disks being faulty. // -// It returns bitrot checksums for the non-nil staleDisks. -func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string, - blocksize int64, healVolume, healPath string, size int64, - algorithm BitrotAlgorithm, checksums [][]byte) (f ErasureFileInfo, - err error) { +// It returns bitrot checksums for the non-nil staleDisks on which +// healing succeeded. +func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string, blocksize int64, + dstVol, dstPath string, size int64, alg BitrotAlgorithm, checksums [][]byte) ( + f ErasureFileInfo, err error) { - if !algorithm.Available() { + if !alg.Available() { return f, traceError(errBitrotHashAlgoInvalid) } @@ -57,15 +58,15 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string, for i, disk := range s.disks { switch { case staleDisks[i] != nil: - hashers[i] = algorithm.New() + hashers[i] = alg.New() case disk == nil: // disregard unavailable disk continue default: - verifiers[i] = NewBitrotVerifier(algorithm, checksums[i]) - f.Checksums[i] = checksums[i] + verifiers[i] = NewBitrotVerifier(alg, checksums[i]) } } + writeErrors := make([]error, len(s.disks)) // Scan part files on disk, block-by-block reconstruct it and // write to stale disks. @@ -125,27 +126,48 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string, // write computed shards as chunks on file in each // stale disk + writeSucceeded := false for i, disk := range staleDisks { - if disk == nil { + // skip nil disk or disk that had error on + // previous write + if disk == nil || writeErrors[i] != nil { continue } - err = disk.AppendFile(healVolume, healPath, blocks[i]) - if err != nil { - return f, traceError(err) + writeErrors[i] = disk.AppendFile(dstVol, dstPath, blocks[i]) + if writeErrors[i] == nil { + hashers[i].Write(blocks[i]) + writeSucceeded = true } - hashers[i].Write(blocks[i]) + } + + // If all disks had write errors we quit. + if !writeSucceeded { + // build error from all write errors + return f, traceError(joinWriteErrors(writeErrors)) } } // copy computed file hashes into output variable f.Size = size - f.Algorithm = algorithm + f.Algorithm = alg for i, disk := range staleDisks { - if disk == nil { + if disk == nil || writeErrors[i] != nil { continue } f.Checksums[i] = hashers[i].Sum(nil) } return f, nil } + +func joinWriteErrors(errs []error) error { + msgs := []string{} + for i, err := range errs { + if err == nil { + continue + } + msgs = append(msgs, fmt.Sprintf("disk %d: %v", i+1, err)) + } + return fmt.Errorf("all stale disks had write errors during healing: %s", + strings.Join(msgs, ", ")) +} diff --git a/cmd/erasure-healfile_test.go b/cmd/erasure-healfile_test.go index c5bc4d39b..aaf99103c 100644 --- a/cmd/erasure-healfile_test.go +++ b/cmd/erasure-healfile_test.go @@ -44,7 +44,7 @@ var erasureHealFileTests = []struct { {dataBlocks: 5, disks: 10, offDisks: 3, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 3 {dataBlocks: 6, disks: 12, offDisks: 2, badDisks: 3, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: SHA256, shouldFail: false}, // 4 {dataBlocks: 7, disks: 14, offDisks: 4, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 5 - {dataBlocks: 8, disks: 16, offDisks: 6, badDisks: 1, badStaleDisks: 1, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 6 + {dataBlocks: 8, disks: 16, offDisks: 6, badDisks: 1, badStaleDisks: 1, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 6 {dataBlocks: 7, disks: 14, offDisks: 2, badDisks: 3, badStaleDisks: 0, blocksize: int64(oneMiByte / 2), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 7 {dataBlocks: 6, disks: 12, offDisks: 1, badDisks: 0, badStaleDisks: 1, blocksize: int64(oneMiByte - 1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 8 {dataBlocks: 5, disks: 10, offDisks: 3, badDisks: 0, badStaleDisks: 3, blocksize: int64(oneMiByte / 2), size: oneMiByte, algorithm: SHA256, shouldFail: true}, // 9 @@ -54,7 +54,7 @@ var erasureHealFileTests = []struct { {dataBlocks: 7, disks: 14, offDisks: 3, badDisks: 4, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 13 {dataBlocks: 7, disks: 14, offDisks: 6, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 14 {dataBlocks: 8, disks: 16, offDisks: 4, badDisks: 5, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 15 - {dataBlocks: 2, disks: 4, offDisks: 0, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 16 + {dataBlocks: 2, disks: 4, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 16 {dataBlocks: 2, disks: 4, offDisks: 0, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: 0, shouldFail: true}, // 17 {dataBlocks: 12, disks: 16, offDisks: 2, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 18 {dataBlocks: 6, disks: 8, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 19 @@ -130,7 +130,7 @@ func TestErasureHealFile(t *testing.T) { // Verify that checksums of staleDisks // match expected values for i, disk := range staleDisks { - if disk == nil { + if disk == nil || info.Checksums[i] == nil { continue } if !reflect.DeepEqual(info.Checksums[i], file.Checksums[i]) { diff --git a/cmd/xl-v1-healing-common.go b/cmd/xl-v1-healing-common.go index 75dd2b9f5..4fd2485a7 100644 --- a/cmd/xl-v1-healing-common.go +++ b/cmd/xl-v1-healing-common.go @@ -17,23 +17,10 @@ package cmd import ( - "crypto/subtle" "path/filepath" - "sync" "time" - - "io" ) -// healBufferPool is a pool of reusable buffers used to verify a stream -// while healing. -var healBufferPool = sync.Pool{ - New: func() interface{} { - b := make([]byte, readSizeV1) - return &b - }, -} - // commonTime returns a maximally occurring time from a list of time. func commonTime(modTimes []time.Time) (modTime time.Time, count int) { var maxima int // Counter for remembering max occurrence of elements. @@ -257,42 +244,49 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObject // disksWithAllParts - This function needs to be called with // []StorageAPI returned by listOnlineDisks. Returns, +// // - disks which have all parts specified in the latest xl.json. +// // - errs updated to have errFileNotFound in place of disks that had -// missing parts. -// - non-nil error if any of the online disks failed during -// calculating blake2b checksum. -func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, object string) ([]StorageAPI, []error, error) { - availableDisks := make([]StorageAPI, len(onlineDisks)) - buffer := healBufferPool.Get().(*[]byte) - defer healBufferPool.Put(buffer) +// missing or corrupted parts. +// +// - non-nil error if any of the disks failed unexpectedly (i.e. error +// other than file not found and not a checksum error). +func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, + object string) ([]StorageAPI, []error, error) { - for diskIndex, onlineDisk := range onlineDisks { + availableDisks := make([]StorageAPI, len(onlineDisks)) + buffer := []byte{} + + for i, onlineDisk := range onlineDisks { if onlineDisk == OfflineDisk { continue } // disk has a valid xl.json but may not have all the // parts. This is considered an outdated disk, since // it needs healing too. - for _, part := range partsMetadata[diskIndex].Parts { + for _, part := range partsMetadata[i].Parts { partPath := filepath.Join(object, part.Name) - checkSumInfo := partsMetadata[diskIndex].Erasure.GetChecksumInfo(part.Name) - hash := checkSumInfo.Algorithm.New() - _, hErr := io.CopyBuffer(hash, StorageReader(onlineDisk, bucket, partPath, 0), *buffer) - if hErr == errFileNotFound { - errs[diskIndex] = errFileNotFound - availableDisks[diskIndex] = OfflineDisk - break - } - if hErr != nil && hErr != errFileNotFound { + checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(part.Name) + verifier := NewBitrotVerifier(checksumInfo.Algorithm, checksumInfo.Hash) + + // verification happens even if a 0-length + // buffer is passed + _, hErr := onlineDisk.ReadFile(bucket, partPath, 0, buffer, verifier) + if hErr != nil { + _, isCorrupted := hErr.(hashMismatchError) + if isCorrupted || hErr == errFileNotFound { + errs[i] = errFileNotFound + availableDisks[i] = OfflineDisk + break + } return nil, nil, traceError(hErr) } - if subtle.ConstantTimeCompare(hash.Sum(nil), checkSumInfo.Hash) != 1 { - errs[diskIndex] = errFileNotFound - availableDisks[diskIndex] = OfflineDisk - break - } - availableDisks[diskIndex] = onlineDisk + } + + if errs[i] == nil { + // All parts verified, mark it as all data available. + availableDisks[i] = onlineDisk } } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index f69da6cac..8eea5fca3 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -333,25 +333,24 @@ func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error } // Heals an object only the corrupted/missing erasure blocks. -func healObject(storageDisks []StorageAPI, bucket string, object string, quorum int) (int, int, error) { +func healObject(storageDisks []StorageAPI, bucket, object string, quorum int) (int, int, error) { + partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object) // readQuorum suffices for xl.json since we use monotonic // system time to break the tie when a split-brain situation // arises. - if reducedErr := reduceReadQuorumErrs(errs, nil, quorum); reducedErr != nil { - return 0, 0, toObjectErr(reducedErr, bucket, object) - } - - if !xlShouldHeal(storageDisks, partsMetadata, errs, bucket, object) { - // There is nothing to heal. - return 0, 0, nil + if rErr := reduceReadQuorumErrs(errs, nil, quorum); rErr != nil { + return 0, 0, toObjectErr(rErr, bucket, object) } // List of disks having latest version of the object. latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - // List of disks having all parts as per latest xl.json. - availableDisks, errs, aErr := disksWithAllParts(latestDisks, partsMetadata, errs, bucket, object) + // List of disks having all parts as per latest xl.json - this + // does a full pass over the data and verifies all part files + // on disk + availableDisks, errs, aErr := disksWithAllParts(latestDisks, partsMetadata, errs, bucket, + object) if aErr != nil { return 0, 0, toObjectErr(aErr, bucket, object) } @@ -359,8 +358,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum // Number of disks which don't serve data. numOfflineDisks := 0 for index, disk := range storageDisks { - switch { - case disk == nil, errs[index] == errDiskNotFound: + if disk == nil || errs[index] == errDiskNotFound { numOfflineDisks++ } } @@ -368,12 +366,16 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum // Number of disks which have all parts of the given object. numAvailableDisks := 0 for _, disk := range availableDisks { - switch { - case disk != nil: + if disk != nil { numAvailableDisks++ } } + if numAvailableDisks == len(storageDisks) { + // nothing to heal in this case + return 0, 0, nil + } + // If less than read quorum number of disks have all the parts // of the data, we can't reconstruct the erasure-coded data. if numAvailableDisks < quorum { @@ -381,8 +383,8 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum } // List of disks having outdated version of the object or missing object. - outDatedDisks := outDatedDisks(storageDisks, availableDisks, errs, partsMetadata, - bucket, object) + outDatedDisks := outDatedDisks(storageDisks, availableDisks, errs, partsMetadata, bucket, + object) // Number of disks that had outdated content of the given // object and are online to be healed. @@ -401,9 +403,10 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum } for index, disk := range outDatedDisks { - // Before healing outdated disks, we need to remove xl.json - // and part files from "bucket/object/" so that - // rename(minioMetaBucket, "tmp/tmpuuid/", "bucket", "object/") succeeds. + // Before healing outdated disks, we need to remove + // xl.json and part files from "bucket/object/" so + // that rename(minioMetaBucket, "tmp/tmpuuid/", + // "bucket", "object/") succeeds. if disk == nil { // Not an outdated disk. continue @@ -417,27 +420,15 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum continue } - // Outdated object with the same name exists that needs to be deleted. - outDatedMeta := partsMetadata[index] - // Consult valid metadata picked when there is no - // metadata available on this disk. - if isErr(errs[index], errFileNotFound) { - outDatedMeta = latestMeta - } - - // Delete all the parts. Ignore if parts are not found. - for _, part := range outDatedMeta.Parts { - dErr := disk.DeleteFile(bucket, pathJoin(object, part.Name)) - if dErr != nil && !isErr(dErr, errFileNotFound) { - return 0, 0, toObjectErr(traceError(dErr), bucket, object) + // List and delete the object directory, ignoring + // errors. + files, err := disk.ListDir(bucket, object) + if err == nil { + for _, entry := range files { + _ = disk.DeleteFile(bucket, + pathJoin(object, entry)) } } - - // Delete xl.json file. Ignore if xl.json not found. - dErr := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile)) - if dErr != nil && !isErr(dErr, errFileNotFound) { - return 0, 0, toObjectErr(traceError(dErr), bucket, object) - } } // Reorder so that we have data disks first and parity disks next. @@ -445,16 +436,19 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution) partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution) - // We write at temporary location and then rename to fianal location. + // We write at temporary location and then rename to final location. tmpID := mustGetUUID() - // Checksum of the part files. checkSumInfos[index] will contain checksums - // of all the part files in the outDatedDisks[index] + // Checksum of the part files. checkSumInfos[index] will + // contain checksums of all the part files in the + // outDatedDisks[index] checksumInfos := make([][]ChecksumInfo, len(outDatedDisks)) - // Heal each part. erasureHealFile() will write the healed part to - // .minio/tmp/uuid/ which needs to be renamed later to the final location. - storage, err := NewErasureStorage(latestDisks, latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks) + // Heal each part. erasureHealFile() will write the healed + // part to .minio/tmp/uuid/ which needs to be renamed later to + // the final location. + storage, err := NewErasureStorage(latestDisks, + latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks) if err != nil { return 0, 0, toObjectErr(err, bucket, object) } @@ -472,14 +466,33 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum } } // Heal the part file. - file, hErr := storage.HealFile(outDatedDisks, bucket, pathJoin(object, partName), erasure.BlockSize, minioMetaTmpBucket, pathJoin(tmpID, partName), partSize, algorithm, checksums) + file, hErr := storage.HealFile(outDatedDisks, bucket, pathJoin(object, partName), + erasure.BlockSize, minioMetaTmpBucket, pathJoin(tmpID, partName), partSize, + algorithm, checksums) if hErr != nil { return 0, 0, toObjectErr(hErr, bucket, object) } - for i := range outDatedDisks { - if outDatedDisks[i] != OfflineDisk { - checksumInfos[i] = append(checksumInfos[i], ChecksumInfo{partName, file.Algorithm, file.Checksums[i]}) + // outDatedDisks that had write errors should not be + // written to for remaining parts, so we nil it out. + for i, disk := range outDatedDisks { + if disk == nil { + continue } + // A non-nil stale disk which did not receive + // a healed part checksum had a write error. + if file.Checksums[i] == nil { + outDatedDisks[i] = nil + numHealedDisks-- + continue + } + // append part checksums + checksumInfos[i] = append(checksumInfos[i], + ChecksumInfo{partName, file.Algorithm, file.Checksums[i]}) + } + + // If all disks are having errors, we give up. + if numHealedDisks == 0 { + return 0, 0, fmt.Errorf("all disks without up-to-date data had write errors") } } @@ -493,7 +506,8 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum } // Generate and write `xl.json` generated from other disks. - outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks)) + outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, + partsMetadata, diskCount(outDatedDisks)) if aErr != nil { return 0, 0, toObjectErr(aErr, bucket, object) } @@ -503,13 +517,10 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum if disk == nil { continue } - // Remove any lingering partial data from current namespace. - aErr = disk.DeleteFile(bucket, retainSlash(object)) - if aErr != nil && aErr != errFileNotFound { - return 0, 0, toObjectErr(traceError(aErr), bucket, object) - } + // Attempt a rename now from healed data to final location. - aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket, retainSlash(object)) + aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket, + retainSlash(object)) if aErr != nil { return 0, 0, toObjectErr(traceError(aErr), bucket, object) }