From ecc039079548788c75fbe91cec28e7cd060eaedf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 16 Apr 2026 12:50:30 -0700 Subject: [PATCH] fix(master): eagerly remove volume from writable when assign hits limit (#9108) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(master): eagerly remove volume from writable when RecordAssign hits limit Previously, a volume was only removed from the writable list by the heartbeat-driven CollectDeadNodeAndFullVolumes pass, which runs every pulse (5s) after a 5s heartbeat. Under sustained concurrent writes, fio-style workloads observed in the field grew volumes 8-20x past the configured 100MB limit (median 530MB, peak 1.98GB) during that 5-15s detection window. RecordAssign already tracks effective size (reported + pending) on each /dir/assign. It now also removes the volume from writable the moment effectiveSize reaches volumeSizeLimit, and mirrors the activeVolumeCount decrement that Topology.SetVolumeCapacityFull would have done on the next heartbeat. The heartbeat path remains unchanged and idempotent (vl.SetVolumeCapacityFull returns false if already removed, so no double-decrement). Recovery still works: if a heartbeat later reports size < limit and the volume is not oversized, EnsureCorrectWritables adds it back. - weed/topology/volume_layout.go: RecordAssign returns reachedCapacity bool; adds AdjustActiveVolumeCountForFull helper. - weed/topology/topology.go: PickForWrite invokes the decrement on eager full transitions. - TestPickForWrite: pass a 1024-byte hint instead of 0 so the default 1MB pendingDelta does not immediately bust the test's 32KB limit. - New TestRecordAssignReachingCapacityRemovesFromWritable covers the eager removal, active count accounting, and no-double-accounting. * fix(master): recover eagerly-removed volume once decay clears pending After RecordAssign eagerly removes a volume from writables because effectiveSize reached the limit, decay can later bring effectiveSize back under the limit (e.g., when a burst of assigns didn't all result in uploads). Without recovery the volume would stay non-writable until vacuum or a ReadOnly flip. UpdateVolumeSize now re-adds the volume to writables once all of the following hold: * RecordAssign is what removed it (tracked via fullSince timestamp) * at least capacityRecoveryDelay has elapsed since the removal (30s) — this prevents bouncing during a steady stream of assigns near the limit * effectiveSize has decayed below the crowded threshold (90% of limit) * reportedSize is under the limit (actual disk is not over) * standard EnsureCorrectWritables preconditions: enough copies, all copies writable, not oversized The caller (SyncDataNodeRegistration) re-increments activeVolumeCount symmetrically with the decrement done on eager removal. * review: release VolumeLayout lock before UpAdjustDiskUsageDelta adjustActiveVolumeCount held vl.accessLock across the tree-climbing UpAdjustDiskUsageDelta walk. That walk takes per-level DiskUsages locks and could be re-entered from other call paths that hold a node-level lock and then acquire vl.accessLock. Copy the node list under the VolumeLayout lock and release it before the tree walk to eliminate the lock-ordering hazard. --- weed/topology/topology.go | 12 +- weed/topology/volume_growth_test.go | 8 +- weed/topology/volume_layout.go | 120 ++++++++++++++- weed/topology/volume_layout_pick_test.go | 185 +++++++++++++++++++++++ 4 files changed, 313 insertions(+), 12 deletions(-) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 30fbfc1f0..5c779f343 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -354,7 +354,9 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, sizePerFile = expectedDataSize } pendingBytes := min(uint64(count)*sizePerFile, uint64(math.MaxInt64)) - volumeLayout.RecordAssign(vid, int64(pendingBytes)) + if volumeLayout.RecordAssign(vid, int64(pendingBytes)) { + volumeLayout.AdjustActiveVolumeCountForFull(vid) + } nextFileId := t.Sequence.NextFileId(requestedCount) fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String() return fileId, count, volumeLocationList, shouldGrow, nil @@ -512,14 +514,18 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.EnsureCorrectWritables(&v) } - // Update effective sizes for all reported volumes (decay pending estimates) + // Update effective sizes for all reported volumes (decay pending estimates). + // If decay brings a volume eagerly removed by RecordAssign back under the + // writable threshold, restore the matching activeVolumeCount. for _, v := range volumeInfos { if v.ReplicaPlacement == nil { continue } diskType := types.ToDiskType(v.DiskType) vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) - vl.UpdateVolumeSize(v.Id, v.Size, v.CompactRevision) + if vl.UpdateVolumeSize(v.Id, v.Size, v.CompactRevision) { + vl.AdjustActiveVolumeCountAfterRecovery(v.Id) + } } return } diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index bc9c0c640..d0b30c2ca 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -433,7 +433,13 @@ func TestPickForWrite(t *testing.T) { continue } volumeGrowOption.DataNode = dn - fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl, 0) + // Small expectedDataSize hint: this test iterates many times + // and the layout uses a 32KB volume size limit. With hint=0 + // (default 1MB pendingDelta), RecordAssign would exceed the + // limit on the first call and eagerly remove the volume + // from writable. Keep the hint tiny so effectiveSize stays + // below the limit across all iterations. + fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl, 1024) if dc == "dc0" { if err == nil || count != 0 || !shouldGrow { fmt.Println(dc, r, dn, "pick for write should be with error") diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 39602502f..48e75ec82 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -108,8 +108,16 @@ type volumeSizeTracking struct { reportedSize uint64 // last heartbeat-reported size (dedup replicas) compactRevision uint32 // detect compaction to reset instead of decay lastUpdateTime time.Time // dedup replicas within the same heartbeat cycle + fullSince time.Time // non-zero while the volume is eagerly marked full by RecordAssign } +// capacityRecoveryDelay is the minimum time a volume must stay out of the +// writable list after being eagerly removed by RecordAssign before it can +// be considered for re-addition by heartbeat-driven decay. Combined with +// the effectiveSize hysteresis band, this avoids bouncing the volume in +// and out of writable within a single burst of assigns. +const capacityRecoveryDelay = 30 * time.Second + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { growRequest atomic.Bool @@ -210,7 +218,12 @@ func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataN // If the compact revision changed, the size drop is from compaction (not // pending writes), so we reset effectiveSize to the reported size instead of // decaying. -func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint64, compactRevision uint32) { +// +// Returns recoveredToWritable = true when decay brought a volume that was +// previously eagerly removed by RecordAssign back under the writable +// threshold and this call re-added it to the writable list. The caller +// should mirror the activeVolumeCount bookkeeping. +func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint64, compactRevision uint32) (recoveredToWritable bool) { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -225,7 +238,7 @@ func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint6 } vl.sizeTracking[vid] = st } else if now.Sub(st.lastUpdateTime) < 2*time.Second { - return // duplicate replica in the same heartbeat cycle + return false // duplicate replica in the same heartbeat cycle } else { st.lastUpdateTime = now st.reportedSize = reportedSize @@ -240,11 +253,40 @@ func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint6 } } - if float64(st.effectiveSize) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { + crowdedThreshold := uint64(float64(vl.volumeSizeLimit) * VolumeGrowStrategy.Threshold) + if st.effectiveSize > crowdedThreshold { vl.setVolumeCrowded(vid) - } else { - vl.removeFromCrowded(vid) + return false } + vl.removeFromCrowded(vid) + + // Recovery path: if we eagerly removed this volume from writables in + // RecordAssign, decay may now have brought effectiveSize back under + // the crowded threshold. Re-add it — but only after the recovery + // delay has elapsed, so a steady stream of assigns near the limit + // does not bounce the volume in and out of writables. + if st.fullSince.IsZero() { + return false + } + if now.Sub(st.fullSince) < capacityRecoveryDelay { + return false + } + if reportedSize >= vl.volumeSizeLimit { + return false // actual on-disk size still over limit; stay out + } + if vl.oversizedVolumes.IsTrue(vid) { + return false + } + if !vl.enoughCopies(vid) || !vl.isAllWritable(vid) { + return false + } + if !vl.setVolumeWritable(vid) { + return false // already writable (shouldn't happen, but be safe) + } + st.fullSince = time.Time{} + glog.V(0).Infof("Volume %d recovered to writable (effective=%d, reported=%d, limit=%d).", + vid, st.effectiveSize, reportedSize, vl.volumeSizeLimit) + return true } func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { @@ -325,21 +367,83 @@ func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool { } // RecordAssign adds the estimated byte size to the volume's tracked effective -// size and marks it crowded if it crosses the threshold. -func (vl *VolumeLayout) RecordAssign(vid needle.VolumeId, pendingDelta int64) { +// size and updates the volume's writable state: +// +// - at the crowded threshold (e.g. 90%): marks crowded to trigger growth. +// - at the hard limit (100%): removes the volume from the writable list +// immediately so new assigns stop landing on it. Returns true if this +// call was what removed the volume, so the caller can mirror the +// disk-usage accounting done by Topology.SetVolumeCapacityFull. +// +// Removing eagerly here avoids waiting for the heartbeat-driven +// CollectDeadNodeAndFullVolumes cycle (5–15s detection latency) during +// which a fast writer could push the volume far past the configured limit. +func (vl *VolumeLayout) RecordAssign(vid needle.VolumeId, pendingDelta int64) (reachedCapacity bool) { vl.accessLock.Lock() defer vl.accessLock.Unlock() st := vl.sizeTracking[vid] if st == nil { - return + return false } if pendingDelta > 0 { st.effectiveSize += uint64(pendingDelta) } + if st.effectiveSize >= vl.volumeSizeLimit { + if vl.removeFromWritable(vid) { + st.fullSince = time.Now() + glog.V(0).Infof("Volume %d reaches full capacity (effective=%d, limit=%d).", + vid, st.effectiveSize, vl.volumeSizeLimit) + return true + } + return false + } if float64(st.effectiveSize) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { vl.setVolumeCrowded(vid) } + return false +} + +// AdjustActiveVolumeCountForFull decrements the active volume count on each +// data node holding this volume. Mirrors the accounting done in +// Topology.SetVolumeCapacityFull for the heartbeat-driven path. Call only +// after RecordAssign returns true for the same vid. +func (vl *VolumeLayout) AdjustActiveVolumeCountForFull(vid needle.VolumeId) { + vl.adjustActiveVolumeCount(vid, -1) +} + +// AdjustActiveVolumeCountAfterRecovery increments the active volume count on +// each data node holding this volume. Mirrors +// AdjustActiveVolumeCountForFull for the recovery path. Call only after +// UpdateVolumeSize returns true for the same vid. +func (vl *VolumeLayout) AdjustActiveVolumeCountAfterRecovery(vid needle.VolumeId) { + vl.adjustActiveVolumeCount(vid, +1) +} + +func (vl *VolumeLayout) adjustActiveVolumeCount(vid needle.VolumeId, delta int64) { + // Copy the node list under the VolumeLayout lock, then release it before + // calling UpAdjustDiskUsageDelta. UpAdjustDiskUsageDelta walks up the + // topology tree taking per-level locks (e.g., DiskUsages.Lock on each + // node). Keeping vl.accessLock held across that tree walk is an + // unnecessary lock-ordering hazard — other call paths that hold a + // topology-level lock and then need vl.accessLock would deadlock. + vl.accessLock.RLock() + vidLocations, found := vl.vid2location[vid] + if !found { + vl.accessLock.RUnlock() + return + } + nodes := make([]*DataNode, len(vidLocations.list)) + copy(nodes, vidLocations.list) + vl.accessLock.RUnlock() + + diskTypeStr := string(vl.diskType) + for _, dn := range nodes { + disk := dn.getOrCreateDisk(diskTypeStr) + disk.UpAdjustDiskUsageDelta(vl.diskType, &DiskUsageCounts{ + activeVolumeCount: delta, + }) + } } const maxDrainWait = 30 * time.Second diff --git a/weed/topology/volume_layout_pick_test.go b/weed/topology/volume_layout_pick_test.go index f63878c9e..ce13c6c22 100644 --- a/weed/topology/volume_layout_pick_test.go +++ b/weed/topology/volume_layout_pick_test.go @@ -313,6 +313,191 @@ func TestRecordAssignMarksCrowded(t *testing.T) { } } +func TestRecordAssignReachingCapacityRemovesFromWritable(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":5000, "replication":"000"}, + {"id":2, "size":5000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + topo, vl := setupPickTest(t, layout, 10000) + + writable, _ := vl.GetWritableVolumeCount() + if writable != 2 { + t.Fatalf("expected 2 writable volumes initially, got %d", writable) + } + // Each volume counts as active initially. + initialActive := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + + // Push vid 1 past the hard limit (5000 + 5000 = 10000 == limit). + reachedCapacity := vl.RecordAssign(1, 5000) + if !reachedCapacity { + t.Fatalf("RecordAssign should return true when effectiveSize reaches limit") + } + vl.AdjustActiveVolumeCountForFull(1) + + writable, _ = vl.GetWritableVolumeCount() + if writable != 1 { + t.Errorf("expected 1 writable after eager removal, got %d", writable) + } + // activeVolumeCount should be decremented for the data node holding vid 1. + afterActive := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + if afterActive != initialActive-1 { + t.Errorf("expected activeVolumeCount=%d, got %d", initialActive-1, afterActive) + } + + // A second RecordAssign on the already-removed volume should not return + // true again (no double accounting). + if vl.RecordAssign(1, 10000) { + t.Errorf("RecordAssign should not report reachedCapacity twice for the same removal") + } + afterSecond := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + if afterSecond != afterActive { + t.Errorf("activeVolumeCount changed on second RecordAssign: before=%d after=%d", afterActive, afterSecond) + } +} + +// advanceSizeTrackingClock backdates a volume's time-sensitive fields by d +// so heartbeat decay and the recovery delay fire on the next update. +func advanceSizeTrackingClock(vl *VolumeLayout, vid needle.VolumeId, d time.Duration) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + st := vl.sizeTracking[vid] + if st == nil { + return + } + if !st.lastUpdateTime.IsZero() { + st.lastUpdateTime = st.lastUpdateTime.Add(-d) + } + if !st.fullSince.IsZero() { + st.fullSince = st.fullSince.Add(-d) + } +} + +func TestUpdateVolumeSizeRecoversEagerlyRemovedVolume(t *testing.T) { + // Two writable volumes, each at 40% of a 10000-byte limit. Push vid 1 + // past the hard limit via RecordAssign, then heartbeat with an + // unchanged reported size so decay shrinks effectiveSize below the + // crowded threshold (90% of limit). After the recovery delay, the + // volume should be re-added to writables and activeVolumeCount + // restored. + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":4000, "replication":"000"}, + {"id":2, "size":4000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + topo, vl := setupPickTest(t, layout, 10000) + + initialActive := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + initialWritables, _ := vl.GetWritableVolumeCount() + + // Push vid 1 past the limit (effective = 4000 + 6000 = 10000). + if !vl.RecordAssign(1, 6000) { + t.Fatalf("RecordAssign should return true at the limit") + } + vl.AdjustActiveVolumeCountForFull(1) + + w, _ := vl.GetWritableVolumeCount() + if w != initialWritables-1 { + t.Fatalf("expected %d writables after eager removal, got %d", initialWritables-1, w) + } + + // Before the recovery delay, a heartbeat that lets decay run should + // *not* re-add the volume (even though effectiveSize would now be + // under the threshold). + advanceSizeTrackingClock(vl, 1, 3*time.Second) // past the 2s dedup window, but before 30s delay + if vl.UpdateVolumeSize(1, 4000, 0) { + t.Fatalf("recovery should not fire before capacityRecoveryDelay") + } + w, _ = vl.GetWritableVolumeCount() + if w != initialWritables-1 { + t.Errorf("writable count should not change before delay, got %d", w) + } + + // Now skip past the recovery delay. Decay the gap further until + // effectiveSize drops below the crowded threshold (9000). + for i := 0; i < 6; i++ { + advanceSizeTrackingClock(vl, 1, 10*time.Second) + recovered := vl.UpdateVolumeSize(1, 4000, 0) + if recovered { + vl.AdjustActiveVolumeCountAfterRecovery(1) + break + } + } + + w, _ = vl.GetWritableVolumeCount() + if w != initialWritables { + t.Errorf("expected recovery to restore %d writables, got %d", initialWritables, w) + } + if got := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount; got != initialActive { + t.Errorf("expected activeVolumeCount restored to %d, got %d", initialActive, got) + } + + // fullSince should have been cleared so a subsequent heartbeat doesn't + // try to recover again. + advanceSizeTrackingClock(vl, 1, 60*time.Second) + if vl.UpdateVolumeSize(1, 4000, 0) { + t.Errorf("recovery should not re-fire after the volume is already writable") + } +} + +func TestUpdateVolumeSizeNoRecoveryWhenDiskStillOversized(t *testing.T) { + // Volume reported at 95% of limit and pushed past limit by RecordAssign. + // Even after the recovery delay and decay, reportedSize remains >= limit + // (real on-disk size is over limit), so recovery must not fire. + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":9500, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout, 10000) + + if !vl.RecordAssign(1, 500) { + t.Fatalf("RecordAssign should hit the limit (9500 + 500 = 10000)") + } + vl.AdjustActiveVolumeCountForFull(1) + + // Plenty of time elapsed — but reported stays at 10500 (over limit). + for i := 0; i < 5; i++ { + advanceSizeTrackingClock(vl, 1, 10*time.Second) + if vl.UpdateVolumeSize(1, 10500, 0) { + t.Fatalf("recovery must not fire when reported >= limit") + } + } + w, _ := vl.GetWritableVolumeCount() + if w != 0 { + t.Errorf("expected 0 writables (volume legitimately full), got %d", w) + } +} + func TestHeartbeatDecaysPendingSize(t *testing.T) { layout := ` {