diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 30fbfc1f0..5c779f343 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -354,7 +354,9 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, sizePerFile = expectedDataSize } pendingBytes := min(uint64(count)*sizePerFile, uint64(math.MaxInt64)) - volumeLayout.RecordAssign(vid, int64(pendingBytes)) + if volumeLayout.RecordAssign(vid, int64(pendingBytes)) { + volumeLayout.AdjustActiveVolumeCountForFull(vid) + } nextFileId := t.Sequence.NextFileId(requestedCount) fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String() return fileId, count, volumeLocationList, shouldGrow, nil @@ -512,14 +514,18 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.EnsureCorrectWritables(&v) } - // Update effective sizes for all reported volumes (decay pending estimates) + // Update effective sizes for all reported volumes (decay pending estimates). + // If decay brings a volume eagerly removed by RecordAssign back under the + // writable threshold, restore the matching activeVolumeCount. for _, v := range volumeInfos { if v.ReplicaPlacement == nil { continue } diskType := types.ToDiskType(v.DiskType) vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) - vl.UpdateVolumeSize(v.Id, v.Size, v.CompactRevision) + if vl.UpdateVolumeSize(v.Id, v.Size, v.CompactRevision) { + vl.AdjustActiveVolumeCountAfterRecovery(v.Id) + } } return } diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index bc9c0c640..d0b30c2ca 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -433,7 +433,13 @@ func TestPickForWrite(t *testing.T) { continue } volumeGrowOption.DataNode = dn - fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl, 0) + // Small expectedDataSize hint: this test iterates many times + // and the layout uses a 32KB volume size limit. With hint=0 + // (default 1MB pendingDelta), RecordAssign would exceed the + // limit on the first call and eagerly remove the volume + // from writable. Keep the hint tiny so effectiveSize stays + // below the limit across all iterations. + fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl, 1024) if dc == "dc0" { if err == nil || count != 0 || !shouldGrow { fmt.Println(dc, r, dn, "pick for write should be with error") diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 39602502f..48e75ec82 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -108,8 +108,16 @@ type volumeSizeTracking struct { reportedSize uint64 // last heartbeat-reported size (dedup replicas) compactRevision uint32 // detect compaction to reset instead of decay lastUpdateTime time.Time // dedup replicas within the same heartbeat cycle + fullSince time.Time // non-zero while the volume is eagerly marked full by RecordAssign } +// capacityRecoveryDelay is the minimum time a volume must stay out of the +// writable list after being eagerly removed by RecordAssign before it can +// be considered for re-addition by heartbeat-driven decay. Combined with +// the effectiveSize hysteresis band, this avoids bouncing the volume in +// and out of writable within a single burst of assigns. +const capacityRecoveryDelay = 30 * time.Second + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { growRequest atomic.Bool @@ -210,7 +218,12 @@ func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataN // If the compact revision changed, the size drop is from compaction (not // pending writes), so we reset effectiveSize to the reported size instead of // decaying. -func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint64, compactRevision uint32) { +// +// Returns recoveredToWritable = true when decay brought a volume that was +// previously eagerly removed by RecordAssign back under the writable +// threshold and this call re-added it to the writable list. The caller +// should mirror the activeVolumeCount bookkeeping. +func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint64, compactRevision uint32) (recoveredToWritable bool) { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -225,7 +238,7 @@ func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint6 } vl.sizeTracking[vid] = st } else if now.Sub(st.lastUpdateTime) < 2*time.Second { - return // duplicate replica in the same heartbeat cycle + return false // duplicate replica in the same heartbeat cycle } else { st.lastUpdateTime = now st.reportedSize = reportedSize @@ -240,11 +253,40 @@ func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint6 } } - if float64(st.effectiveSize) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { + crowdedThreshold := uint64(float64(vl.volumeSizeLimit) * VolumeGrowStrategy.Threshold) + if st.effectiveSize > crowdedThreshold { vl.setVolumeCrowded(vid) - } else { - vl.removeFromCrowded(vid) + return false } + vl.removeFromCrowded(vid) + + // Recovery path: if we eagerly removed this volume from writables in + // RecordAssign, decay may now have brought effectiveSize back under + // the crowded threshold. Re-add it — but only after the recovery + // delay has elapsed, so a steady stream of assigns near the limit + // does not bounce the volume in and out of writables. + if st.fullSince.IsZero() { + return false + } + if now.Sub(st.fullSince) < capacityRecoveryDelay { + return false + } + if reportedSize >= vl.volumeSizeLimit { + return false // actual on-disk size still over limit; stay out + } + if vl.oversizedVolumes.IsTrue(vid) { + return false + } + if !vl.enoughCopies(vid) || !vl.isAllWritable(vid) { + return false + } + if !vl.setVolumeWritable(vid) { + return false // already writable (shouldn't happen, but be safe) + } + st.fullSince = time.Time{} + glog.V(0).Infof("Volume %d recovered to writable (effective=%d, reported=%d, limit=%d).", + vid, st.effectiveSize, reportedSize, vl.volumeSizeLimit) + return true } func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { @@ -325,21 +367,83 @@ func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool { } // RecordAssign adds the estimated byte size to the volume's tracked effective -// size and marks it crowded if it crosses the threshold. -func (vl *VolumeLayout) RecordAssign(vid needle.VolumeId, pendingDelta int64) { +// size and updates the volume's writable state: +// +// - at the crowded threshold (e.g. 90%): marks crowded to trigger growth. +// - at the hard limit (100%): removes the volume from the writable list +// immediately so new assigns stop landing on it. Returns true if this +// call was what removed the volume, so the caller can mirror the +// disk-usage accounting done by Topology.SetVolumeCapacityFull. +// +// Removing eagerly here avoids waiting for the heartbeat-driven +// CollectDeadNodeAndFullVolumes cycle (5–15s detection latency) during +// which a fast writer could push the volume far past the configured limit. +func (vl *VolumeLayout) RecordAssign(vid needle.VolumeId, pendingDelta int64) (reachedCapacity bool) { vl.accessLock.Lock() defer vl.accessLock.Unlock() st := vl.sizeTracking[vid] if st == nil { - return + return false } if pendingDelta > 0 { st.effectiveSize += uint64(pendingDelta) } + if st.effectiveSize >= vl.volumeSizeLimit { + if vl.removeFromWritable(vid) { + st.fullSince = time.Now() + glog.V(0).Infof("Volume %d reaches full capacity (effective=%d, limit=%d).", + vid, st.effectiveSize, vl.volumeSizeLimit) + return true + } + return false + } if float64(st.effectiveSize) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { vl.setVolumeCrowded(vid) } + return false +} + +// AdjustActiveVolumeCountForFull decrements the active volume count on each +// data node holding this volume. Mirrors the accounting done in +// Topology.SetVolumeCapacityFull for the heartbeat-driven path. Call only +// after RecordAssign returns true for the same vid. +func (vl *VolumeLayout) AdjustActiveVolumeCountForFull(vid needle.VolumeId) { + vl.adjustActiveVolumeCount(vid, -1) +} + +// AdjustActiveVolumeCountAfterRecovery increments the active volume count on +// each data node holding this volume. Mirrors +// AdjustActiveVolumeCountForFull for the recovery path. Call only after +// UpdateVolumeSize returns true for the same vid. +func (vl *VolumeLayout) AdjustActiveVolumeCountAfterRecovery(vid needle.VolumeId) { + vl.adjustActiveVolumeCount(vid, +1) +} + +func (vl *VolumeLayout) adjustActiveVolumeCount(vid needle.VolumeId, delta int64) { + // Copy the node list under the VolumeLayout lock, then release it before + // calling UpAdjustDiskUsageDelta. UpAdjustDiskUsageDelta walks up the + // topology tree taking per-level locks (e.g., DiskUsages.Lock on each + // node). Keeping vl.accessLock held across that tree walk is an + // unnecessary lock-ordering hazard — other call paths that hold a + // topology-level lock and then need vl.accessLock would deadlock. + vl.accessLock.RLock() + vidLocations, found := vl.vid2location[vid] + if !found { + vl.accessLock.RUnlock() + return + } + nodes := make([]*DataNode, len(vidLocations.list)) + copy(nodes, vidLocations.list) + vl.accessLock.RUnlock() + + diskTypeStr := string(vl.diskType) + for _, dn := range nodes { + disk := dn.getOrCreateDisk(diskTypeStr) + disk.UpAdjustDiskUsageDelta(vl.diskType, &DiskUsageCounts{ + activeVolumeCount: delta, + }) + } } const maxDrainWait = 30 * time.Second diff --git a/weed/topology/volume_layout_pick_test.go b/weed/topology/volume_layout_pick_test.go index f63878c9e..ce13c6c22 100644 --- a/weed/topology/volume_layout_pick_test.go +++ b/weed/topology/volume_layout_pick_test.go @@ -313,6 +313,191 @@ func TestRecordAssignMarksCrowded(t *testing.T) { } } +func TestRecordAssignReachingCapacityRemovesFromWritable(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":5000, "replication":"000"}, + {"id":2, "size":5000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + topo, vl := setupPickTest(t, layout, 10000) + + writable, _ := vl.GetWritableVolumeCount() + if writable != 2 { + t.Fatalf("expected 2 writable volumes initially, got %d", writable) + } + // Each volume counts as active initially. + initialActive := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + + // Push vid 1 past the hard limit (5000 + 5000 = 10000 == limit). + reachedCapacity := vl.RecordAssign(1, 5000) + if !reachedCapacity { + t.Fatalf("RecordAssign should return true when effectiveSize reaches limit") + } + vl.AdjustActiveVolumeCountForFull(1) + + writable, _ = vl.GetWritableVolumeCount() + if writable != 1 { + t.Errorf("expected 1 writable after eager removal, got %d", writable) + } + // activeVolumeCount should be decremented for the data node holding vid 1. + afterActive := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + if afterActive != initialActive-1 { + t.Errorf("expected activeVolumeCount=%d, got %d", initialActive-1, afterActive) + } + + // A second RecordAssign on the already-removed volume should not return + // true again (no double accounting). + if vl.RecordAssign(1, 10000) { + t.Errorf("RecordAssign should not report reachedCapacity twice for the same removal") + } + afterSecond := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + if afterSecond != afterActive { + t.Errorf("activeVolumeCount changed on second RecordAssign: before=%d after=%d", afterActive, afterSecond) + } +} + +// advanceSizeTrackingClock backdates a volume's time-sensitive fields by d +// so heartbeat decay and the recovery delay fire on the next update. +func advanceSizeTrackingClock(vl *VolumeLayout, vid needle.VolumeId, d time.Duration) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + st := vl.sizeTracking[vid] + if st == nil { + return + } + if !st.lastUpdateTime.IsZero() { + st.lastUpdateTime = st.lastUpdateTime.Add(-d) + } + if !st.fullSince.IsZero() { + st.fullSince = st.fullSince.Add(-d) + } +} + +func TestUpdateVolumeSizeRecoversEagerlyRemovedVolume(t *testing.T) { + // Two writable volumes, each at 40% of a 10000-byte limit. Push vid 1 + // past the hard limit via RecordAssign, then heartbeat with an + // unchanged reported size so decay shrinks effectiveSize below the + // crowded threshold (90% of limit). After the recovery delay, the + // volume should be re-added to writables and activeVolumeCount + // restored. + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":4000, "replication":"000"}, + {"id":2, "size":4000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + topo, vl := setupPickTest(t, layout, 10000) + + initialActive := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount + initialWritables, _ := vl.GetWritableVolumeCount() + + // Push vid 1 past the limit (effective = 4000 + 6000 = 10000). + if !vl.RecordAssign(1, 6000) { + t.Fatalf("RecordAssign should return true at the limit") + } + vl.AdjustActiveVolumeCountForFull(1) + + w, _ := vl.GetWritableVolumeCount() + if w != initialWritables-1 { + t.Fatalf("expected %d writables after eager removal, got %d", initialWritables-1, w) + } + + // Before the recovery delay, a heartbeat that lets decay run should + // *not* re-add the volume (even though effectiveSize would now be + // under the threshold). + advanceSizeTrackingClock(vl, 1, 3*time.Second) // past the 2s dedup window, but before 30s delay + if vl.UpdateVolumeSize(1, 4000, 0) { + t.Fatalf("recovery should not fire before capacityRecoveryDelay") + } + w, _ = vl.GetWritableVolumeCount() + if w != initialWritables-1 { + t.Errorf("writable count should not change before delay, got %d", w) + } + + // Now skip past the recovery delay. Decay the gap further until + // effectiveSize drops below the crowded threshold (9000). + for i := 0; i < 6; i++ { + advanceSizeTrackingClock(vl, 1, 10*time.Second) + recovered := vl.UpdateVolumeSize(1, 4000, 0) + if recovered { + vl.AdjustActiveVolumeCountAfterRecovery(1) + break + } + } + + w, _ = vl.GetWritableVolumeCount() + if w != initialWritables { + t.Errorf("expected recovery to restore %d writables, got %d", initialWritables, w) + } + if got := topo.diskUsages.usages[types.HardDriveType].activeVolumeCount; got != initialActive { + t.Errorf("expected activeVolumeCount restored to %d, got %d", initialActive, got) + } + + // fullSince should have been cleared so a subsequent heartbeat doesn't + // try to recover again. + advanceSizeTrackingClock(vl, 1, 60*time.Second) + if vl.UpdateVolumeSize(1, 4000, 0) { + t.Errorf("recovery should not re-fire after the volume is already writable") + } +} + +func TestUpdateVolumeSizeNoRecoveryWhenDiskStillOversized(t *testing.T) { + // Volume reported at 95% of limit and pushed past limit by RecordAssign. + // Even after the recovery delay and decay, reportedSize remains >= limit + // (real on-disk size is over limit), so recovery must not fire. + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":9500, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout, 10000) + + if !vl.RecordAssign(1, 500) { + t.Fatalf("RecordAssign should hit the limit (9500 + 500 = 10000)") + } + vl.AdjustActiveVolumeCountForFull(1) + + // Plenty of time elapsed — but reported stays at 10500 (over limit). + for i := 0; i < 5; i++ { + advanceSizeTrackingClock(vl, 1, 10*time.Second) + if vl.UpdateVolumeSize(1, 10500, 0) { + t.Fatalf("recovery must not fire when reported >= limit") + } + } + w, _ := vl.GetWritableVolumeCount() + if w != 0 { + t.Errorf("expected 0 writables (volume legitimately full), got %d", w) + } +} + func TestHeartbeatDecaysPendingSize(t *testing.T) { layout := ` {