diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index c2a73d07e..56a95e588 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -72,10 +72,12 @@ type healingTracker struct { // Numbers when current bucket started healing, // for resuming with correct numbers. - ResumeItemsHealed uint64 `json:"-"` - ResumeItemsFailed uint64 `json:"-"` - ResumeBytesDone uint64 `json:"-"` - ResumeBytesFailed uint64 `json:"-"` + ResumeItemsHealed uint64 `json:"-"` + ResumeItemsFailed uint64 `json:"-"` + ResumeItemsSkipped uint64 `json:"-"` + ResumeBytesDone uint64 `json:"-"` + ResumeBytesFailed uint64 `json:"-"` + ResumeBytesSkipped uint64 `json:"-"` // Filled on startup/restarts. QueuedBuckets []string @@ -90,6 +92,9 @@ type healingTracker struct { BytesSkipped uint64 RetryAttempts uint64 + + Finished bool // finished healing, whether with errors or not + // Add future tracking capabilities // Be sure that they are included in toHealingDisk } @@ -262,8 +267,10 @@ func (h *healingTracker) resume() { h.ItemsHealed = h.ResumeItemsHealed h.ItemsFailed = h.ResumeItemsFailed + h.ItemsSkipped = h.ResumeItemsSkipped h.BytesDone = h.ResumeBytesDone h.BytesFailed = h.ResumeBytesFailed + h.BytesSkipped = h.ResumeBytesSkipped } // bucketDone should be called when a bucket is done healing. @@ -274,8 +281,10 @@ func (h *healingTracker) bucketDone(bucket string) { h.ResumeItemsHealed = h.ItemsHealed h.ResumeItemsFailed = h.ItemsFailed + h.ResumeItemsSkipped = h.ItemsSkipped h.ResumeBytesDone = h.BytesDone h.ResumeBytesFailed = h.BytesFailed + h.ResumeBytesSkipped = h.BytesSkipped h.HealedBuckets = append(h.HealedBuckets, bucket) for i, b := range h.QueuedBuckets { if b == bucket { @@ -324,6 +333,7 @@ func (h *healingTracker) toHealingDisk() madmin.HealingDisk { PoolIndex: h.PoolIndex, SetIndex: h.SetIndex, DiskIndex: h.DiskIndex, + Finished: h.Finished, Path: h.Path, Started: h.Started.UTC(), LastUpdate: h.LastUpdate.UTC(), @@ -373,7 +383,7 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { disksToHeal = append(disksToHeal, disk.Endpoint()) continue } - if disk.Healing() != nil { + if h := disk.Healing(); h != nil && !h.Finished { disksToHeal = append(disksToHeal, disk.Endpoint()) } } @@ -519,7 +529,8 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint continue } if t.HealID == tracker.HealID { - t.delete(ctx) + t.Finished = true + t.update(ctx) } } diff --git a/cmd/background-newdisks-heal-ops_gen.go b/cmd/background-newdisks-heal-ops_gen.go index 52350eb39..17fc01b8a 100644 --- a/cmd/background-newdisks-heal-ops_gen.go +++ b/cmd/background-newdisks-heal-ops_gen.go @@ -132,6 +132,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ResumeItemsFailed") return } + case "ResumeItemsSkipped": + z.ResumeItemsSkipped, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeItemsSkipped") + return + } case "ResumeBytesDone": z.ResumeBytesDone, err = dc.ReadUint64() if err != nil { @@ -144,6 +150,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ResumeBytesFailed") return } + case "ResumeBytesSkipped": + z.ResumeBytesSkipped, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeBytesSkipped") + return + } case "QueuedBuckets": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() @@ -206,6 +218,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "RetryAttempts") return } + case "Finished": + z.Finished, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Finished") + return + } default: err = dc.Skip() if err != nil { @@ -219,9 +237,9 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 26 + // map header, size 29 // write "ID" - err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x49, 0x44) + err = en.Append(0xde, 0x0, 0x1d, 0xa2, 0x49, 0x44) if err != nil { return } @@ -400,6 +418,16 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ResumeItemsFailed") return } + // write "ResumeItemsSkipped" + err = en.Append(0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeItemsSkipped) + if err != nil { + err = msgp.WrapError(err, "ResumeItemsSkipped") + return + } // write "ResumeBytesDone" err = en.Append(0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) if err != nil { @@ -420,6 +448,16 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ResumeBytesFailed") return } + // write "ResumeBytesSkipped" + err = en.Append(0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeBytesSkipped) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesSkipped") + return + } // write "QueuedBuckets" err = en.Append(0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) if err != nil { @@ -494,15 +532,25 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "RetryAttempts") return } + // write "Finished" + err = en.Append(0xa8, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteBool(z.Finished) + if err != nil { + err = msgp.WrapError(err, "Finished") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 26 + // map header, size 29 // string "ID" - o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x49, 0x44) + o = append(o, 0xde, 0x0, 0x1d, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) // string "PoolIndex" o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) @@ -555,12 +603,18 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { // string "ResumeItemsFailed" o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) o = msgp.AppendUint64(o, z.ResumeItemsFailed) + // string "ResumeItemsSkipped" + o = append(o, 0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeItemsSkipped) // string "ResumeBytesDone" o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) o = msgp.AppendUint64(o, z.ResumeBytesDone) // string "ResumeBytesFailed" o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) o = msgp.AppendUint64(o, z.ResumeBytesFailed) + // string "ResumeBytesSkipped" + o = append(o, 0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeBytesSkipped) // string "QueuedBuckets" o = append(o, 0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets))) @@ -585,6 +639,9 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { // string "RetryAttempts" o = append(o, 0xad, 0x52, 0x65, 0x74, 0x72, 0x79, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73) o = msgp.AppendUint64(o, z.RetryAttempts) + // string "Finished" + o = append(o, 0xa8, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64) + o = msgp.AppendBool(o, z.Finished) return } @@ -714,6 +771,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ResumeItemsFailed") return } + case "ResumeItemsSkipped": + z.ResumeItemsSkipped, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeItemsSkipped") + return + } case "ResumeBytesDone": z.ResumeBytesDone, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { @@ -726,6 +789,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ResumeBytesFailed") return } + case "ResumeBytesSkipped": + z.ResumeBytesSkipped, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesSkipped") + return + } case "QueuedBuckets": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) @@ -788,6 +857,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "RetryAttempts") return } + case "Finished": + z.Finished, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Finished") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -802,7 +877,7 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *healingTracker) Msgsize() (s int) { - s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize + s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 19 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 19 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize for za0001 := range z.QueuedBuckets { s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001]) } @@ -810,6 +885,6 @@ func (z *healingTracker) Msgsize() (s int) { for za0002 := range z.HealedBuckets { s += msgp.StringPrefixSize + len(z.HealedBuckets[za0002]) } - s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size + 14 + msgp.Uint64Size + s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size + 14 + msgp.Uint64Size + 9 + msgp.BoolSize return } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 253a5bdcb..2552f67d7 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -230,8 +230,11 @@ func (s *erasureSets) connectDisks(log bool) { } return } - if disk.IsLocal() && disk.Healing() != nil { - globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) + if disk.IsLocal() { + h := disk.Healing() + if h != nil && !h.Finished { + globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) + } } s.erasureDisksMu.Lock() setIndex, diskIndex, err := findDiskIndex(s.format, format) diff --git a/cmd/erasure.go b/cmd/erasure.go index e78ce6ef4..1c20d010f 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -203,11 +203,9 @@ func getDisksInfo(disks []StorageAPI, endpoints []Endpoint, metrics bool) (disks di.State = diskErrToDriveState(err) di.FreeInodes = info.FreeInodes di.UsedInodes = info.UsedInodes - if info.Healing { - if hi := disks[index].Healing(); hi != nil { - hd := hi.toHealingDisk() - di.HealInfo = &hd - } + if hi := disks[index].Healing(); hi != nil { + hd := hi.toHealingDisk() + di.HealInfo = &hd } di.Metrics = &madmin.DiskMetrics{ LastMinute: make(map[string]madmin.TimedAction, len(info.Metrics.LastMinute)), diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 10a8cc760..49449e190 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -227,7 +227,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // Collect updates to tracker from concurrent healEntry calls results := make(chan healEntryResult, 1000) - defer close(results) + quitting := make(chan struct{}) + defer func() { + close(results) + <-quitting + }() go func() { for res := range results { @@ -241,6 +245,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker.updateProgress(res.success, res.skipped, res.bytes) } + + healingLogIf(ctx, tracker.update(ctx)) + close(quitting) }() var retErr error diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index c75e94c15..9ae4a356b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -341,7 +341,13 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { // Healing is 'true' when // - if we found an unformatted disk (no 'format.json') // - if we found healing tracker 'healing.bin' - dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil) + dcinfo.Healing = errors.Is(err, errUnformattedDisk) + if !dcinfo.Healing { + if hi := s.Healing(); hi != nil && !hi.Finished { + dcinfo.Healing = true + } + } + dcinfo.ID = diskID return dcinfo, err },