diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index 857a2f8cd..547e21a0a 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -94,16 +94,23 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i r, w := io.Pipe() h := algo.New() - bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: &sync.WaitGroup{}} + bw := &streamingBitrotWriter{ + iow: ioutil.NewDeadlineWriter(w, diskMaxTimeout), + closeWithErr: w.CloseWithError, + h: h, + shardSize: shardSize, + canClose: &sync.WaitGroup{}, + } bw.canClose.Add(1) go func() { + defer bw.canClose.Done() + totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1) if length != -1 { bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. totalFileSize = bitrotSumsTotalSize + length } r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r)) - bw.canClose.Done() }() return bw } diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index f848c2f6a..cdaabf628 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -20,6 +20,7 @@ package cmd import ( "context" "fmt" + "io" "math/rand" "sync" "time" @@ -188,18 +189,21 @@ func readMultipleFiles(ctx context.Context, disks []StorageAPI, req ReadMultiple dataArray = append(dataArray, toAdd) } + ignoredErrs := []error{ + errFileNotFound, + errVolumeNotFound, + errFileVersionNotFound, + io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors + io.EOF, // some times we would read without locks, ignore these errors + } + ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) + errs := g.Wait() for index, err := range errs { if err == nil { continue } - if !IsErr(err, []error{ - errFileNotFound, - errVolumeNotFound, - errFileVersionNotFound, - errDiskNotFound, - errUnformattedDisk, - }...) { + if !IsErr(err, ignoredErrs...) { logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)", disks[index], req.Bucket, req.Prefix, err), disks[index].String()) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index fae041e2b..fc15dafb8 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -170,11 +170,10 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve errFileNotFound, errVolumeNotFound, errFileVersionNotFound, - errDiskNotFound, - errUnformattedDisk, io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors io.EOF, // some times we would read without locks, ignore these errors } + ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) errs := g.Wait() for index, err := range errs { if err == nil { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 52c081a18..3c9fa7148 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -571,10 +571,10 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r errFileNameTooLong, errVolumeNotFound, errFileVersionNotFound, - errDiskNotFound, io.ErrUnexpectedEOF, // some times we would read without locks, ignore these errors io.EOF, // some times we would read without locks, ignore these errors } + ignoredErrs = append(ignoredErrs, objectOpIgnoredErrs...) errs := g.Wait() for index, err := range errs { diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index 23a85a7c0..f86d6f909 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -260,7 +260,7 @@ func (sys *S3PeerSys) MakeBucket(ctx context.Context, bucket string, opts MakeBu perPoolErrs = append(perPoolErrs, errs[i]) } } - if poolErr := reduceReadQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil { + if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil { return toObjectErr(poolErr, bucket) } } @@ -303,7 +303,7 @@ func (sys *S3PeerSys) DeleteBucket(ctx context.Context, bucket string, opts Dele perPoolErrs = append(perPoolErrs, errs[i]) } } - if poolErr := reduceReadQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil && poolErr != errVolumeNotFound { + if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, len(perPoolErrs)/2+1); poolErr != nil && poolErr != errVolumeNotFound { // re-create successful deletes, since we are return an error. sys.MakeBucket(ctx, bucket, MakeBucketOptions{}) return toObjectErr(poolErr, bucket) diff --git a/cmd/peer-s3-server.go b/cmd/peer-s3-server.go index 21fbf5a23..fdf1661ad 100644 --- a/cmd/peer-s3-server.go +++ b/cmd/peer-s3-server.go @@ -210,13 +210,12 @@ func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOpti } } - for _, err := range errs { - if err != nil { - return err - } - } + // Since we recreated buckets and error was `not-empty`, return not-empty. + if recreate { + return errVolumeNotEmpty + } // for all other errors reduce by write quorum. - return nil + return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives)/2)+1) } func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) error { @@ -239,14 +238,18 @@ func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) // requested. return nil } - if err != nil && !errors.Is(err, errVolumeExists) { - logger.LogIf(ctx, err) - } return err }, index) } errs := g.Wait() + + for _, err := range errs { + if err != nil && !IsErr(err, bucketOpIgnoredErrs...) { + logger.LogIf(ctx, err) + } + } + return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives)/2)+1) } diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index eacc2cb49..3424e2cc0 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -270,6 +270,11 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err si := p.updateStorageMetrics(storageMetricDiskInfo) defer si(&err) + if p.health.isFaulty() { + // if disk is already faulty return faulty for 'mc admin info' output and prometheus alerts. + return info, errFaultyDisk + } + info, err = p.storage.DiskInfo(ctx) if err != nil { return info, err @@ -278,15 +283,8 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err info.Metrics = p.getMetrics() // check cached diskID against backend // only if its non-empty. - if p.diskID != "" { - if p.diskID != info.ID { - return info, errDiskNotFound - } - } - - if p.health.isFaulty() { - // if disk is already faulty return faulty for 'mc admin info' output and prometheus alerts. - return info, errFaultyDisk + if p.diskID != "" && p.diskID != info.ID { + return info, errDiskNotFound } return info, nil @@ -299,7 +297,8 @@ func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...strin } defer done(&err) - return p.storage.MakeVolBulk(ctx, volumes...) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.MakeVolBulk(ctx, volumes...) }) } func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err error) { @@ -308,14 +307,9 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err return err } defer done(&err) - if contextCanceled(ctx) { - return ctx.Err() - } - if err = p.checkDiskStale(); err != nil { - return err - } - return p.storage.MakeVol(ctx, volume) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.MakeVol(ctx, volume) }) } func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) (vi []VolInfo, err error) { @@ -335,7 +329,13 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol } defer done(&err) - return p.storage.StatVol(ctx, volume) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + err = w.Run(func() error { + var ierr error + vol, ierr = p.storage.StatVol(ctx, volume) + return ierr + }) + return vol, err } func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) { @@ -345,7 +345,8 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for } defer done(&err) - return p.storage.DeleteVol(ctx, volume, forceDelete) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.DeleteVol(ctx, volume, forceDelete) }) } func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) (s []string, err error) { @@ -405,7 +406,8 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat } defer done(&err) - return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) }) } func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) { @@ -415,7 +417,13 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat } defer done(&err) - return p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + err = w.Run(func() error { + var ierr error + sign, ierr = p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath) + return ierr + }) + return sign, err } func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) { @@ -425,7 +433,8 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa } defer done(&err) - return p.storage.CheckParts(ctx, volume, path, fi) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.CheckParts(ctx, volume, path, fi) }) } func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) { @@ -435,7 +444,8 @@ func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path s } defer done(&err) - return p.storage.Delete(ctx, volume, path, deleteOpts) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.Delete(ctx, volume, path, deleteOpts) }) } // DeleteVersions deletes slice of versions, it can be same object @@ -455,6 +465,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string return errs } defer done(&err) + errs = p.storage.DeleteVersions(ctx, volume, versions) for i := range errs { if errs[i] != nil { @@ -483,7 +494,8 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path } defer done(&err) - return p.storage.WriteAll(ctx, volume, path, b) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.WriteAll(ctx, volume, path, b) }) } func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) { @@ -493,7 +505,8 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s } defer done(&err) - return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) }) } func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { @@ -503,7 +516,8 @@ func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path } defer done(&err) - return p.storage.UpdateMetadata(ctx, volume, path, fi) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi) }) } func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) { @@ -513,7 +527,8 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s } defer done(&err) - return p.storage.WriteMetadata(ctx, volume, path, fi) + w := xioutil.NewDeadlineWorker(diskMaxTimeout) + return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) }) } func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { @@ -641,13 +656,29 @@ var diskMaxConcurrent = 512 // the state of disks. var diskStartChecking = 32 +// diskMaxTimeoutOperation maximum wait time before we consider a drive +// offline under active monitoring. +var diskMaxTimeout = 2 * time.Minute + func init() { - s := env.Get("_MINIO_DISK_MAX_CONCURRENT", "512") - diskMaxConcurrent, _ = strconv.Atoi(s) - if diskMaxConcurrent <= 0 { - logger.Info("invalid _MINIO_DISK_MAX_CONCURRENT value: %s, defaulting to '512'", s) - diskMaxConcurrent = 512 + s := env.Get("_MINIO_DISK_MAX_CONCURRENT", "") + if s != "" { + diskMaxConcurrent, _ = strconv.Atoi(s) + if diskMaxConcurrent <= 0 { + logger.Info("invalid _MINIO_DISK_MAX_CONCURRENT value: %s, defaulting to '512'", s) + diskMaxConcurrent = 512 + } } + d := env.Get("_MINIO_DISK_MAX_TIMEOUT", "") + if d != "" { + timeoutOperation, _ := time.ParseDuration(d) + if timeoutOperation < time.Second { + logger.Info("invalid _MINIO_DISK_MAX_TIMEOUT value: %s, minimum can be 1s, defaulting to '2 minutes'", d) + } else { + diskMaxTimeout = timeoutOperation + } + } + diskStartChecking = 16 + diskMaxConcurrent/8 if diskStartChecking > diskMaxConcurrent { diskStartChecking = diskMaxConcurrent @@ -828,7 +859,7 @@ func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) { if t > maxTimeSinceLastSuccess { if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline, time since last response %v", globalLocalNodeName, p.storage.String(), t.Round(time.Millisecond))) - go p.monitorDiskStatus() + go p.monitorDiskStatus(t) } return errFaultyDisk } @@ -837,9 +868,10 @@ func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) { // monitorDiskStatus should be called once when a drive has been marked offline. // Once the disk has been deemed ok, it will return to online status. -func (p *xlStorageDiskIDCheck) monitorDiskStatus() { +func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration) { t := time.NewTicker(5 * time.Second) defer t.Stop() + fn := mustGetUUID() for range t.C { if len(p.health.tokens) == 0 { @@ -859,8 +891,11 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus() { Force: false, }) if err == nil { - logger.Info("node(%s): Read/Write/Delete successful, bringing drive %s online. Drive was offline for %s.", globalLocalNodeName, p.storage.String(), - time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess)))) + t := time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess)) + if spent > 0 { + t = t.Add(spent) + } + logger.Info("node(%s): Read/Write/Delete successful, bringing drive %s online. Drive was offline for %s.", globalLocalNodeName, p.storage.String(), time.Since(t)) atomic.StoreInt32(&p.health.status, diskHealthOK) return } @@ -870,17 +905,26 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus() { // monitorDiskStatus should be called once when a drive has been marked offline. // Once the disk has been deemed ok, it will return to online status. func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { - const ( + var ( // We check every 15 seconds if the disk is writable and we can read back. checkEvery = 15 * time.Second - // Disk has 2 minutes to complete write+read. - timeoutOperation = 2 * time.Minute - // If the disk has completed an operation successfully within last 5 seconds, don't check it. skipIfSuccessBefore = 5 * time.Second ) + // if disk max timeout is smaller than checkEvery window + // reduce checks by a second. + if diskMaxTimeout <= checkEvery { + checkEvery -= time.Second + } + + // if disk max timeout is smaller than skipIfSuccessBefore window + // reduce the skipIfSuccessBefore by a second. + if diskMaxTimeout <= skipIfSuccessBefore { + skipIfSuccessBefore -= time.Second + } + t := time.NewTicker(checkEvery) defer t.Stop() fn := mustGetUUID() @@ -900,10 +944,10 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { // We recently saw a success - no need to check. continue } - goOffline := func(err error) { + goOffline := func(err error, spent time.Duration) { if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err)) - go p.monitorDiskStatus() + go p.monitorDiskStatus(spent) } } // Offset checks a bit. @@ -911,11 +955,11 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { done := make(chan struct{}) started := time.Now() go func() { - timeout := time.NewTimer(timeoutOperation) + timeout := time.NewTimer(diskMaxTimeout) select { case <-timeout.C: spent := time.Since(started) - goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond))) + goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond)), spent) case <-done: if !timeout.Stop() { <-timeout.C @@ -927,14 +971,14 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { err := p.storage.WriteAll(ctx, minioMetaTmpBucket, fn, toWrite) if err != nil { if osErrToFileErr(err) == errFaultyDisk { - goOffline(fmt.Errorf("unable to write: %w", err)) + goOffline(fmt.Errorf("unable to write: %w", err), 0) } return } b, err := p.storage.ReadAll(context.Background(), minioMetaTmpBucket, fn) if err != nil || len(b) != len(toWrite) { if osErrToFileErr(err) == errFaultyDisk { - goOffline(fmt.Errorf("unable to read: %w", err)) + goOffline(fmt.Errorf("unable to read: %w", err), 0) } return } diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index 18381fb2d..093385dd2 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -80,6 +80,50 @@ type DeadlineWriter struct { err error } +// DeadlineWorker implements the deadline/timeout resiliency pattern. +type DeadlineWorker struct { + timeout time.Duration + err error +} + +// NewDeadlineWorker constructs a new DeadlineWorker with the given timeout. +func NewDeadlineWorker(timeout time.Duration) *DeadlineWorker { + return &DeadlineWorker{ + timeout: timeout, + } +} + +// Run runs the given function, passing it a stopper channel. If the deadline passes before +// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper +// channel so that the work function can attempt to exit gracefully. It does not (and cannot) +// simply kill the running function, so if it doesn't respect the stopper channel then it may +// keep running after the deadline passes. If the function finishes before the deadline, then +// the return value of the function is returned from Run. +func (d *DeadlineWorker) Run(work func() error) error { + if d.err != nil { + return d.err + } + + c := make(chan ioret, 1) + t := time.NewTimer(d.timeout) + go func() { + c <- ioret{0, work()} + close(c) + }() + + select { + case r := <-c: + if !t.Stop() { + <-t.C + } + d.err = r.err + return r.err + case <-t.C: + d.err = context.Canceled + return context.Canceled + } +} + // NewDeadlineWriter wraps a writer to make it respect given deadline // value per Write(). If there is a blocking write, the returned Writer // will return whenever the timer hits (the return values are n=0 @@ -95,8 +139,6 @@ func (w *DeadlineWriter) Write(buf []byte) (int, error) { c := make(chan ioret, 1) t := time.NewTimer(w.timeout) - defer t.Stop() - go func() { n, err := w.WriteCloser.Write(buf) c <- ioret{n, err} @@ -105,9 +147,13 @@ func (w *DeadlineWriter) Write(buf []byte) (int, error) { select { case r := <-c: + if !t.Stop() { + <-t.C + } w.err = r.err return r.n, r.err case <-t.C: + w.WriteCloser.Close() w.err = context.Canceled return 0, context.Canceled }