From 85c3db3a93c0183051f1c9153aa61acb1b21ff62 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Tue, 20 Aug 2024 16:42:49 +0100 Subject: [PATCH] heal: Add finished flag to .healing.bin to avoid removing this latter (#20250) Sometimes, we need historical information in .healing.bin, such as the number of expired objects that the healing avoids to heal and that can create drive usage disparency in the same erasure set. For that reason, this commit will not remove .healing.bin anymore and it will have a new field called Finished so we know healing is finished in that drive. --- cmd/background-newdisks-heal-ops.go | 23 +++++-- cmd/background-newdisks-heal-ops_gen.go | 87 +++++++++++++++++++++++-- cmd/erasure-sets.go | 7 +- cmd/erasure.go | 8 +-- cmd/global-heal.go | 9 ++- cmd/xl-storage.go | 8 ++- 6 files changed, 121 insertions(+), 21 deletions(-) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index c2a73d07e..56a95e588 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -72,10 +72,12 @@ type healingTracker struct { // Numbers when current bucket started healing, // for resuming with correct numbers. - ResumeItemsHealed uint64 `json:"-"` - ResumeItemsFailed uint64 `json:"-"` - ResumeBytesDone uint64 `json:"-"` - ResumeBytesFailed uint64 `json:"-"` + ResumeItemsHealed uint64 `json:"-"` + ResumeItemsFailed uint64 `json:"-"` + ResumeItemsSkipped uint64 `json:"-"` + ResumeBytesDone uint64 `json:"-"` + ResumeBytesFailed uint64 `json:"-"` + ResumeBytesSkipped uint64 `json:"-"` // Filled on startup/restarts. QueuedBuckets []string @@ -90,6 +92,9 @@ type healingTracker struct { BytesSkipped uint64 RetryAttempts uint64 + + Finished bool // finished healing, whether with errors or not + // Add future tracking capabilities // Be sure that they are included in toHealingDisk } @@ -262,8 +267,10 @@ func (h *healingTracker) resume() { h.ItemsHealed = h.ResumeItemsHealed h.ItemsFailed = h.ResumeItemsFailed + h.ItemsSkipped = h.ResumeItemsSkipped h.BytesDone = h.ResumeBytesDone h.BytesFailed = h.ResumeBytesFailed + h.BytesSkipped = h.ResumeBytesSkipped } // bucketDone should be called when a bucket is done healing. @@ -274,8 +281,10 @@ func (h *healingTracker) bucketDone(bucket string) { h.ResumeItemsHealed = h.ItemsHealed h.ResumeItemsFailed = h.ItemsFailed + h.ResumeItemsSkipped = h.ItemsSkipped h.ResumeBytesDone = h.BytesDone h.ResumeBytesFailed = h.BytesFailed + h.ResumeBytesSkipped = h.BytesSkipped h.HealedBuckets = append(h.HealedBuckets, bucket) for i, b := range h.QueuedBuckets { if b == bucket { @@ -324,6 +333,7 @@ func (h *healingTracker) toHealingDisk() madmin.HealingDisk { PoolIndex: h.PoolIndex, SetIndex: h.SetIndex, DiskIndex: h.DiskIndex, + Finished: h.Finished, Path: h.Path, Started: h.Started.UTC(), LastUpdate: h.LastUpdate.UTC(), @@ -373,7 +383,7 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { disksToHeal = append(disksToHeal, disk.Endpoint()) continue } - if disk.Healing() != nil { + if h := disk.Healing(); h != nil && !h.Finished { disksToHeal = append(disksToHeal, disk.Endpoint()) } } @@ -519,7 +529,8 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint continue } if t.HealID == tracker.HealID { - t.delete(ctx) + t.Finished = true + t.update(ctx) } } diff --git a/cmd/background-newdisks-heal-ops_gen.go b/cmd/background-newdisks-heal-ops_gen.go index 52350eb39..17fc01b8a 100644 --- a/cmd/background-newdisks-heal-ops_gen.go +++ b/cmd/background-newdisks-heal-ops_gen.go @@ -132,6 +132,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ResumeItemsFailed") return } + case "ResumeItemsSkipped": + z.ResumeItemsSkipped, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeItemsSkipped") + return + } case "ResumeBytesDone": z.ResumeBytesDone, err = dc.ReadUint64() if err != nil { @@ -144,6 +150,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ResumeBytesFailed") return } + case "ResumeBytesSkipped": + z.ResumeBytesSkipped, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeBytesSkipped") + return + } case "QueuedBuckets": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() @@ -206,6 +218,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "RetryAttempts") return } + case "Finished": + z.Finished, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Finished") + return + } default: err = dc.Skip() if err != nil { @@ -219,9 +237,9 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 26 + // map header, size 29 // write "ID" - err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x49, 0x44) + err = en.Append(0xde, 0x0, 0x1d, 0xa2, 0x49, 0x44) if err != nil { return } @@ -400,6 +418,16 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ResumeItemsFailed") return } + // write "ResumeItemsSkipped" + err = en.Append(0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeItemsSkipped) + if err != nil { + err = msgp.WrapError(err, "ResumeItemsSkipped") + return + } // write "ResumeBytesDone" err = en.Append(0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) if err != nil { @@ -420,6 +448,16 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ResumeBytesFailed") return } + // write "ResumeBytesSkipped" + err = en.Append(0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeBytesSkipped) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesSkipped") + return + } // write "QueuedBuckets" err = en.Append(0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) if err != nil { @@ -494,15 +532,25 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "RetryAttempts") return } + // write "Finished" + err = en.Append(0xa8, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteBool(z.Finished) + if err != nil { + err = msgp.WrapError(err, "Finished") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 26 + // map header, size 29 // string "ID" - o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x49, 0x44) + o = append(o, 0xde, 0x0, 0x1d, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) // string "PoolIndex" o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) @@ -555,12 +603,18 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { // string "ResumeItemsFailed" o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) o = msgp.AppendUint64(o, z.ResumeItemsFailed) + // string "ResumeItemsSkipped" + o = append(o, 0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeItemsSkipped) // string "ResumeBytesDone" o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) o = msgp.AppendUint64(o, z.ResumeBytesDone) // string "ResumeBytesFailed" o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) o = msgp.AppendUint64(o, z.ResumeBytesFailed) + // string "ResumeBytesSkipped" + o = append(o, 0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeBytesSkipped) // string "QueuedBuckets" o = append(o, 0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets))) @@ -585,6 +639,9 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { // string "RetryAttempts" o = append(o, 0xad, 0x52, 0x65, 0x74, 0x72, 0x79, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73) o = msgp.AppendUint64(o, z.RetryAttempts) + // string "Finished" + o = append(o, 0xa8, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64) + o = msgp.AppendBool(o, z.Finished) return } @@ -714,6 +771,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ResumeItemsFailed") return } + case "ResumeItemsSkipped": + z.ResumeItemsSkipped, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeItemsSkipped") + return + } case "ResumeBytesDone": z.ResumeBytesDone, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { @@ -726,6 +789,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ResumeBytesFailed") return } + case "ResumeBytesSkipped": + z.ResumeBytesSkipped, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesSkipped") + return + } case "QueuedBuckets": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) @@ -788,6 +857,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "RetryAttempts") return } + case "Finished": + z.Finished, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Finished") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -802,7 +877,7 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *healingTracker) Msgsize() (s int) { - s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize + s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 19 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 19 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize for za0001 := range z.QueuedBuckets { s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001]) } @@ -810,6 +885,6 @@ func (z *healingTracker) Msgsize() (s int) { for za0002 := range z.HealedBuckets { s += msgp.StringPrefixSize + len(z.HealedBuckets[za0002]) } - s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size + 14 + msgp.Uint64Size + s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size + 14 + msgp.Uint64Size + 9 + msgp.BoolSize return } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 253a5bdcb..2552f67d7 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -230,8 +230,11 @@ func (s *erasureSets) connectDisks(log bool) { } return } - if disk.IsLocal() && disk.Healing() != nil { - globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) + if disk.IsLocal() { + h := disk.Healing() + if h != nil && !h.Finished { + globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) + } } s.erasureDisksMu.Lock() setIndex, diskIndex, err := findDiskIndex(s.format, format) diff --git a/cmd/erasure.go b/cmd/erasure.go index e78ce6ef4..1c20d010f 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -203,11 +203,9 @@ func getDisksInfo(disks []StorageAPI, endpoints []Endpoint, metrics bool) (disks di.State = diskErrToDriveState(err) di.FreeInodes = info.FreeInodes di.UsedInodes = info.UsedInodes - if info.Healing { - if hi := disks[index].Healing(); hi != nil { - hd := hi.toHealingDisk() - di.HealInfo = &hd - } + if hi := disks[index].Healing(); hi != nil { + hd := hi.toHealingDisk() + di.HealInfo = &hd } di.Metrics = &madmin.DiskMetrics{ LastMinute: make(map[string]madmin.TimedAction, len(info.Metrics.LastMinute)), diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 10a8cc760..49449e190 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -227,7 +227,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // Collect updates to tracker from concurrent healEntry calls results := make(chan healEntryResult, 1000) - defer close(results) + quitting := make(chan struct{}) + defer func() { + close(results) + <-quitting + }() go func() { for res := range results { @@ -241,6 +245,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker.updateProgress(res.success, res.skipped, res.bytes) } + + healingLogIf(ctx, tracker.update(ctx)) + close(quitting) }() var retErr error diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index c75e94c15..9ae4a356b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -341,7 +341,13 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { // Healing is 'true' when // - if we found an unformatted disk (no 'format.json') // - if we found healing tracker 'healing.bin' - dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil) + dcinfo.Healing = errors.Is(err, errUnformattedDisk) + if !dcinfo.Healing { + if hi := s.Healing(); hi != nil && !hi.Finished { + dcinfo.Healing = true + } + } + dcinfo.ID = diskID return dcinfo, err },