diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 11239420e..ba90b8dae 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/rand/v2" "slices" "sync" @@ -319,6 +320,11 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { return next, nil } +// EstimatedNeedleSizeBytes is the assumed size per assigned file ID, used to +// estimate pending bytes between heartbeats. Intentionally coarse — it only +// needs to spread load, not be precise. +const EstimatedNeedleSizeBytes = 1024 * 1024 // 1 MB + func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) { var vid needle.VolumeId vid, count, volumeLocationList, shouldGrow, err = volumeLayout.PickForWrite(requestedCount, option) @@ -328,6 +334,10 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, if volumeLocationList == nil || volumeLocationList.Length() == 0 { return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", NoWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } + // Track estimated assigned bytes to spread load between heartbeats. + // Compute in uint64 and cap to avoid overflow on the int64 cast. + pendingBytes := min(uint64(count)*EstimatedNeedleSizeBytes, uint64(math.MaxInt64)) + volumeLayout.RecordAssign(vid, int64(pendingBytes)) nextFileId := t.Sequence.NextFileId(requestedCount) fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String() return fileId, count, volumeLocationList, shouldGrow, nil @@ -485,6 +495,15 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) vl.EnsureCorrectWritables(&v) } + // Update effective sizes for all reported volumes (decay pending estimates) + for _, v := range volumeInfos { + if v.ReplicaPlacement == nil { + continue + } + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) + vl.UpdateVolumeSize(v.Id, v.Size) + } return } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 97a3f9558..938592113 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -116,7 +116,9 @@ type VolumeLayout struct { vacuumedVolumes map[needle.VolumeId]time.Time volumeSizeLimit uint64 replicationAsMin bool - accessLock sync.RWMutex + accessLock sync.RWMutex + vid2size map[needle.VolumeId]uint64 // effective size: reported + pending + vid2reportedSize map[needle.VolumeId]uint64 // last heartbeat-reported size (dedup replicas) } type VolumeLayoutStats struct { @@ -138,6 +140,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType vacuumedVolumes: make(map[needle.VolumeId]time.Time), volumeSizeLimit: volumeSizeLimit, replicationAsMin: replicationAsMin, + vid2size: make(map[needle.VolumeId]uint64), + vid2reportedSize: make(map[needle.VolumeId]uint64), } } @@ -155,6 +159,11 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewVolumeLocationList() } vl.vid2location[v.Id].Set(dn) + // For new volumes, initialize vid2size from reported size. + if _, exists := vl.vid2size[v.Id]; !exists { + vl.vid2size[v.Id] = v.Size + vl.vid2reportedSize[v.Id] = v.Size + } // glog.V(4).Infof("volume %d added to %s len %d copy %d", v.Id, dn.Id(), vl.vid2location[v.Id].Length(), v.ReplicaPlacement.GetCopyCount()) for _, dn := range vl.vid2location[v.Id].list { if vInfo, err := dn.GetVolumesById(v.Id); err == nil { @@ -184,6 +193,30 @@ func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataN } } +// UpdateVolumeSize is called on every heartbeat for every reported volume. +// It decays the pending size estimate toward the reported size and updates +// crowded state. Replicated volumes report from multiple DataNodes; decay +// runs only once per new reported size to avoid double-halving. +func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint64) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + if reportedSize == vl.vid2reportedSize[vid] { + return // same size from another replica in this cycle + } + vl.vid2reportedSize[vid] = reportedSize + if prev := vl.vid2size[vid]; prev > reportedSize { + vl.vid2size[vid] = reportedSize + (prev-reportedSize)/2 + } else { + vl.vid2size[vid] = reportedSize + } + if float64(vl.vid2size[vid]) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { + vl.setVolumeCrowded(vid) + } else { + vl.removeFromCrowded(vid) + } +} + func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -202,6 +235,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Length() == 0 { delete(vl.vid2location, v.Id) + delete(vl.vid2size, v.Id) + delete(vl.vid2reportedSize, v.Id) vl.removeFromCrowded(v.Id) } @@ -260,6 +295,20 @@ func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool { return float64(v.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold } +// RecordAssign adds the estimated byte size to the volume's tracked effective +// size and marks it crowded if it crosses the threshold. +func (vl *VolumeLayout) RecordAssign(vid needle.VolumeId, pendingDelta int64) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + if pendingDelta > 0 { + vl.vid2size[vid] += uint64(pendingDelta) + } + if float64(vl.vid2size[vid]) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { + vl.setVolumeCrowded(vid) + } +} + func (vl *VolumeLayout) isEmpty() bool { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -296,23 +345,20 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi return 0, 0, nil, true, fmt.Errorf("%s", NoWritableVolumes) } if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" { - vid := vl.writables[rand.IntN(lenWriters)] - locationList = vl.vid2location[vid] + vid, locationList = vl.pickWeightedByRemaining(vl.writables) if locationList == nil || len(locationList.list) == 0 { return 0, 0, nil, false, fmt.Errorf("Strangely vid %s is on no machine!", vid.String()) } return vid, count, locationList.Copy(), false, nil } - // clone vl.writables - writables := make([]needle.VolumeId, len(vl.writables)) - copy(writables, vl.writables) - // randomize the writables - rand.Shuffle(len(writables), func(i, j int) { - writables[i], writables[j] = writables[j], writables[i] - }) - - for _, writableVolumeId := range writables { + // Scan from a random offset to collect up to pickSampleSize matching + // candidates, avoiding a full scan + allocation in the common case. + var sample [pickSampleSize]needle.VolumeId + found := 0 + start := rand.IntN(lenWriters) + for i := 0; i < lenWriters && found < pickSampleSize; i++ { + writableVolumeId := vl.writables[(start+i)%lenWriters] volumeLocationList := vl.vid2location[writableVolumeId] for _, dn := range volumeLocationList.list { if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) { @@ -324,11 +370,75 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { continue } - vid, locationList, counter = writableVolumeId, volumeLocationList.Copy(), count - return + sample[found] = writableVolumeId + found++ + break } } - return vid, count, locationList, true, fmt.Errorf("%s in DataCenter:%v Rack:%v DataNode:%v", NoWritableVolumes, option.DataCenter, option.Rack, option.DataNode) + if found == 0 { + return vid, count, locationList, true, fmt.Errorf("%s in DataCenter:%v Rack:%v DataNode:%v", NoWritableVolumes, option.DataCenter, option.Rack, option.DataNode) + } + vid, locationList = vl.weightedPick(sample[:found]) + return vid, count, locationList.Copy(), false, nil +} + +// pickSampleSize is how many random candidates to sample before doing a +// weighted pick. Keeps cost O(1) regardless of total writable volume count +// while still biasing toward emptier volumes. +const pickSampleSize = 3 + +// pickWeightedByRemaining randomly samples a few candidates from the list, +// then does a weighted pick among them by remaining capacity. +// Sampled candidates may repeat when len(candidates) is small relative to +// pickSampleSize; this is harmless — a repeated volume just gets proportionally +// more weight, which is a negligible statistical effect. +func (vl *VolumeLayout) pickWeightedByRemaining(candidates []needle.VolumeId) (needle.VolumeId, *VolumeLocationList) { + n := len(candidates) + if n <= pickSampleSize { + return vl.weightedPick(candidates) + } + + var sample [pickSampleSize]needle.VolumeId + for i := range sample { + sample[i] = candidates[rand.IntN(n)] + } + return vl.weightedPick(sample[:]) +} + +func (vl *VolumeLayout) weightedPick(candidates []needle.VolumeId) (needle.VolumeId, *VolumeLocationList) { + if len(candidates) == 1 { + vid := candidates[0] + return vid, vl.vid2location[vid] + } + + // first pass: sum weights + var totalRemaining uint64 + for _, vid := range candidates { + totalRemaining += vl.remainingSize(vid) + } + + // second pass: weighted random pick + pick := rand.Uint64N(totalRemaining) + var cumulative uint64 + for _, vid := range candidates { + cumulative += vl.remainingSize(vid) + if pick < cumulative { + return vid, vl.vid2location[vid] + } + } + + vid := candidates[0] + return vid, vl.vid2location[vid] +} + +func (vl *VolumeLayout) remainingSize(vid needle.VolumeId) uint64 { + size := vl.vid2size[vid] + if size < vl.volumeSizeLimit { + if r := vl.volumeSizeLimit - size; r > 1 { + return r + } + } + return 1 } func (vl *VolumeLayout) HasGrowRequest() bool { @@ -372,8 +482,13 @@ func (vl *VolumeLayout) ShouldGrowVolumesByDcAndRack(writables *[]needle.VolumeI if !checkDcOnly && dn.GetRack().Id() != rackId { continue } - if info, err := dn.GetVolumesById(v); err == nil && !vl.isCrowdedVolume(&info) { - return false + if _, err := dn.GetVolumesById(v); err == nil { + vl.accessLock.RLock() + size := vl.vid2size[v] + vl.accessLock.RUnlock() + if float64(size) <= float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { + return false + } } } } diff --git a/weed/topology/volume_layout_pick_bench_test.go b/weed/topology/volume_layout_pick_bench_test.go new file mode 100644 index 000000000..1a0c6335f --- /dev/null +++ b/weed/topology/volume_layout_pick_bench_test.go @@ -0,0 +1,134 @@ +package topology + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +var benchLayoutSmall = ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":2000, "replication":"000"}, + {"id":2, "size":5000, "replication":"000"}, + {"id":3, "size":8000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + +var benchLayoutMedium = ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":1000, "replication":"000"}, + {"id":2, "size":2000, "replication":"000"}, + {"id":3, "size":3000, "replication":"000"}, + {"id":4, "size":4000, "replication":"000"}, + {"id":5, "size":5000, "replication":"000"}, + {"id":6, "size":6000, "replication":"000"}, + {"id":7, "size":7000, "replication":"000"}, + {"id":8, "size":8000, "replication":"000"}, + {"id":9, "size":9000, "replication":"000"}, + {"id":10, "size":9500, "replication":"000"} + ], + "limit":20 + } + } + } +} +` + +var benchLayoutLarge = ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":500, "replication":"000"}, + {"id":2, "size":1000, "replication":"000"}, + {"id":3, "size":1500, "replication":"000"}, + {"id":4, "size":2000, "replication":"000"}, + {"id":5, "size":2500, "replication":"000"}, + {"id":6, "size":3000, "replication":"000"}, + {"id":7, "size":3500, "replication":"000"}, + {"id":8, "size":4000, "replication":"000"}, + {"id":9, "size":4500, "replication":"000"}, + {"id":10, "size":5000, "replication":"000"}, + {"id":11, "size":5500, "replication":"000"}, + {"id":12, "size":6000, "replication":"000"}, + {"id":13, "size":6500, "replication":"000"}, + {"id":14, "size":7000, "replication":"000"}, + {"id":15, "size":7500, "replication":"000"}, + {"id":16, "size":8000, "replication":"000"}, + {"id":17, "size":8500, "replication":"000"}, + {"id":18, "size":9000, "replication":"000"}, + {"id":19, "size":9200, "replication":"000"}, + {"id":20, "size":9500, "replication":"000"} + ], + "limit":30 + } + } + } +} +` + +func benchPickForWrite(b *testing.B, layout string, volumeSizeLimit uint64) { + topo := setupWithLimit(b, layout, volumeSizeLimit) + rp, _ := super_block.NewReplicaPlacementFromString("000") + vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType) + option := &VolumeGrowOption{} + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + vl.PickForWrite(1, option) + } +} + +func BenchmarkPickForWrite_3Volumes(b *testing.B) { + benchPickForWrite(b, benchLayoutSmall, 10000) +} + +func BenchmarkPickForWrite_10Volumes(b *testing.B) { + benchPickForWrite(b, benchLayoutMedium, 10000) +} + +func BenchmarkPickForWrite_20Volumes(b *testing.B) { + benchPickForWrite(b, benchLayoutLarge, 10000) +} + +func benchPickForWriteConstrained(b *testing.B, layout string, volumeSizeLimit uint64) { + topo := setupWithLimit(b, layout, volumeSizeLimit) + rp, _ := super_block.NewReplicaPlacementFromString("000") + vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType) + option := &VolumeGrowOption{DataCenter: "dc1"} + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + vl.PickForWrite(1, option) + } +} + +func BenchmarkPickForWriteConstrained_3Volumes(b *testing.B) { + benchPickForWriteConstrained(b, benchLayoutSmall, 10000) +} + +func BenchmarkPickForWriteConstrained_10Volumes(b *testing.B) { + benchPickForWriteConstrained(b, benchLayoutMedium, 10000) +} + +func BenchmarkPickForWriteConstrained_20Volumes(b *testing.B) { + benchPickForWriteConstrained(b, benchLayoutLarge, 10000) +} diff --git a/weed/topology/volume_layout_pick_test.go b/weed/topology/volume_layout_pick_test.go new file mode 100644 index 000000000..62860a87b --- /dev/null +++ b/weed/topology/volume_layout_pick_test.go @@ -0,0 +1,472 @@ +package topology + +import ( + "encoding/json" + "math" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// setupWithLimit is like setup() but allows specifying the volumeSizeLimit +// so that VolumeLayouts are created with the correct limit from the start. +func setupWithLimit(t testing.TB, topologyLayout string, volumeSizeLimit uint64) *Topology { + t.Helper() + var data interface{} + if err := json.Unmarshal([]byte(topologyLayout), &data); err != nil { + t.Fatalf("setupWithLimit: json.Unmarshal: %v", err) + } + mTopology, ok := data.(map[string]interface{}) + if !ok { + t.Fatalf("setupWithLimit: expected map[string]interface{}, got %T", data) + } + + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), volumeSizeLimit, 5, false) + for dcKey, dcValue := range mTopology { + dc := NewDataCenter(dcKey) + dcMap := dcValue.(map[string]interface{}) + topo.LinkChildNode(dc) + for rackKey, rackValue := range dcMap { + dcRack := NewRack(rackKey) + rackMap := rackValue.(map[string]interface{}) + dc.LinkChildNode(dcRack) + for serverKey, serverValue := range rackMap { + server := NewDataNode(serverKey) + serverMap := serverValue.(map[string]interface{}) + if ip, ok := serverMap["ip"]; ok { + server.Ip = ip.(string) + } + dcRack.LinkChildNode(server) + for _, v := range serverMap["volumes"].([]interface{}) { + m := v.(map[string]interface{}) + vi := storage.VolumeInfo{ + Id: needle.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Version: needle.GetCurrentVersion(), + } + if mVal, ok := m["collection"]; ok { + vi.Collection = mVal.(string) + } + if mVal, ok := m["replication"]; ok { + rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string)) + vi.ReplicaPlacement = rp + } + if vi.ReplicaPlacement != nil { + vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType) + vl.RegisterVolume(&vi, server) + vl.setVolumeWritable(vi.Id) + } + server.AddOrUpdateVolume(vi) + } + disk := server.getOrCreateDisk("") + disk.UpAdjustDiskUsageDelta("", &DiskUsageCounts{ + maxVolumeCount: int64(serverMap["limit"].(float64)), + }) + } + } + } + return topo +} + +func setupPickTest(t testing.TB, layout string, volumeSizeLimit uint64) (*Topology, *VolumeLayout) { + t.Helper() + topo := setupWithLimit(t, layout, volumeSizeLimit) + rp, _ := super_block.NewReplicaPlacementFromString("000") + vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType) + return topo, vl +} + +func TestPickForWriteWeightedDistribution(t *testing.T) { + // 3 volumes at 20%, 50%, 80% full (sizes 2000, 5000, 8000 of limit 10000) + // remaining: 8000, 5000, 2000 => ratios ~53%, 33%, 13% + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":2000, "replication":"000"}, + {"id":2, "size":5000, "replication":"000"}, + {"id":3, "size":8000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + counts := make(map[needle.VolumeId]int) + option := &VolumeGrowOption{} + n := 60000 + + for i := 0; i < n; i++ { + vid, _, _, _, err := vl.PickForWrite(1, option) + if err != nil { + t.Fatalf("PickForWrite: %v", err) + } + counts[vid]++ + } + + // vid 1 (remaining 8000) > vid 2 (remaining 5000) > vid 3 (remaining 2000) + if counts[1] <= counts[3] { + t.Errorf("expected vid 1 picked more than vid 3: vid1=%d, vid3=%d", counts[1], counts[3]) + } + if counts[2] <= counts[3] { + t.Errorf("expected vid 2 picked more than vid 3: vid2=%d, vid3=%d", counts[2], counts[3]) + } + + // Check proportions: expected 8000/5000/2000 out of 15000 + expected := map[needle.VolumeId]float64{ + 1: 8000.0 / 15000.0, + 2: 5000.0 / 15000.0, + 3: 2000.0 / 15000.0, + } + for vid, expectedPct := range expected { + actualPct := float64(counts[vid]) / float64(n) + if math.Abs(actualPct-expectedPct) > 0.03 { + t.Errorf("vid %d: expected ~%.1f%%, got %.1f%%", vid, expectedPct*100, actualPct*100) + } + } +} + +func TestPickForWriteWithPendingSize(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":1000, "replication":"000"}, + {"id":2, "size":1000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + // Add large pending to vid 1, making it effectively 9000/10000 + vl.RecordAssign(1, 8000) + + counts := make(map[needle.VolumeId]int) + option := &VolumeGrowOption{} + n := 10000 + + for i := 0; i < n; i++ { + vid, _, _, _, err := vl.PickForWrite(1, option) + if err != nil { + t.Fatalf("PickForWrite: %v", err) + } + counts[vid]++ + } + + // vid 2 (remaining ~9000) should be picked much more than vid 1 (remaining ~1000) + ratio := float64(counts[2]) / float64(counts[1]) + if ratio < 3.0 { + t.Errorf("vid2/vid1 ratio %.2f expected >= 3.0 (vid1=%d, vid2=%d)", ratio, counts[1], counts[2]) + } +} + +func TestPickForWriteSingleWritable(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":5000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + option := &VolumeGrowOption{} + for i := 0; i < 100; i++ { + vid, _, _, _, err := vl.PickForWrite(1, option) + if err != nil { + t.Fatalf("PickForWrite: %v", err) + } + if vid != 1 { + t.Fatalf("expected vid 1, got %d", vid) + } + } +} + +func TestPickForWriteAllNearFull(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":9999, "replication":"000"}, + {"id":2, "size":9999, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + option := &VolumeGrowOption{} + for i := 0; i < 100; i++ { + vid, _, _, _, err := vl.PickForWrite(1, option) + if err != nil { + t.Fatalf("PickForWrite: %v", err) + } + if vid != 1 && vid != 2 { + t.Fatalf("expected vid 1 or 2, got %d", vid) + } + } +} + +func TestPickForWriteConstrainedWeighted(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "ip":"10.0.0.1", + "volumes":[ + {"id":1, "size":2000, "replication":"000"}, + {"id":2, "size":8000, "replication":"000"} + ], + "limit":10 + } + }, + "rack2":{ + "server2":{ + "ip":"10.0.0.2", + "volumes":[ + {"id":3, "size":5000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + counts := make(map[needle.VolumeId]int) + option := &VolumeGrowOption{DataCenter: "dc1"} + n := 20000 + + for i := 0; i < n; i++ { + vid, _, _, _, err := vl.PickForWrite(1, option) + if err != nil { + t.Fatalf("PickForWrite: %v", err) + } + counts[vid]++ + } + + // vid 1 (remaining 8000) should be picked most, vid 2 (remaining 2000) least + if counts[1] <= counts[2] { + t.Errorf("expected vid 1 picked more than vid 2: vid1=%d, vid2=%d", counts[1], counts[2]) + } +} + +func TestRecordAssignMarksCrowded(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":8500, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + // Volume at 85% — not crowded yet (threshold is 90%) + _, crowded := vl.GetWritableVolumeCount() + if crowded != 0 { + t.Fatalf("expected 0 crowded, got %d", crowded) + } + + // Add pending that pushes past 90% + vl.RecordAssign(1, 1000) + + _, crowded = vl.GetWritableVolumeCount() + if crowded != 1 { + t.Errorf("expected 1 crowded after pending push past 90%%, got %d", crowded) + } +} + +func TestHeartbeatDecaysPendingSize(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":1000, "replication":"000"}, + {"id":2, "size":1000, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + // vid2size starts at 1000 (reported). Add 8000 pending → 9000. + vl.RecordAssign(1, 8000) + + vl.accessLock.RLock() + if vl.vid2size[1] != 9000 { + t.Fatalf("expected vid2size=9000 after RecordAssign, got %d", vl.vid2size[1]) + } + vl.accessLock.RUnlock() + + // Heartbeat: volume server reports size=3000 (some writes landed). + // Old effective=9000, new reported=3000 → excess=6000 → decayed to 3000. + // So vid2size should become 3000 + 6000/2 = 6000, not just 3000. + vl.UpdateVolumeSize(1, 3000) + + vl.accessLock.RLock() + if vl.vid2size[1] != 6000 { + t.Errorf("expected vid2size=6000 after decay (3000 + 6000/2), got %d", vl.vid2size[1]) + } + vl.accessLock.RUnlock() + + // Second heartbeat: size=5000. Old effective=6000 → excess=1000 → decay to 500. + // vid2size should become 5000 + 1000/2 = 5500. + vl.UpdateVolumeSize(1, 5000) + + vl.accessLock.RLock() + if vl.vid2size[1] != 5500 { + t.Errorf("expected vid2size=5500 after second decay (5000 + 1000/2), got %d", vl.vid2size[1]) + } + vl.accessLock.RUnlock() + + // Third heartbeat: size=5500. Old effective=5500 → no excess. + // vid2size should be exactly 5500. + vl.UpdateVolumeSize(1, 5500) + + vl.accessLock.RLock() + if vl.vid2size[1] != 5500 { + t.Errorf("expected vid2size=5500 (no excess), got %d", vl.vid2size[1]) + } + vl.accessLock.RUnlock() + + // vid 2 (remaining 9000) should be picked more than vid 1 (remaining 4500) + counts := make(map[needle.VolumeId]int) + option := &VolumeGrowOption{} + for i := 0; i < 10000; i++ { + vid, _, _, _, err := vl.PickForWrite(1, option) + if err != nil { + t.Fatalf("PickForWrite: %v", err) + } + counts[vid]++ + } + if counts[2] <= counts[1] { + t.Errorf("vid 2 (remaining 9000) should be picked more than vid 1 (remaining 4500): vid1=%d, vid2=%d", counts[1], counts[2]) + } +} + +func TestHeartbeatDecayDedupReplicas(t *testing.T) { + // Volume 1 replicated on server1 and server2. + // Both servers report size=3000 in the same heartbeat cycle. + // Decay should run only once, not once per replica. + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "ip":"10.0.0.1", + "volumes":[ + {"id":1, "size":1000, "replication":"001"} + ], + "limit":10 + }, + "server2":{ + "ip":"10.0.0.2", + "volumes":[ + {"id":1, "size":1000, "replication":"001"} + ], + "limit":10 + } + } + } +} +` + topo := setupWithLimit(t, layout, 10000) + rp, _ := super_block.NewReplicaPlacementFromString("001") + vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType) + + // Add pending: effective = 1000 + 8000 = 9000 + vl.RecordAssign(1, 8000) + + vl.accessLock.RLock() + if vl.vid2size[1] != 9000 { + t.Fatalf("expected vid2size=9000, got %d", vl.vid2size[1]) + } + vl.accessLock.RUnlock() + + // Both replicas report size=3000. Decay should happen once: 3000 + (9000-3000)/2 = 6000. + // Calling UpdateVolumeSize twice simulates two replicas reporting in the same cycle. + vl.UpdateVolumeSize(1, 3000) + vl.UpdateVolumeSize(1, 3000) // second replica, same size — should be a no-op + + vl.accessLock.RLock() + got := vl.vid2size[1] + vl.accessLock.RUnlock() + // Without dedup: would be 3000 + (6000-3000)/2 = 4500 (double decay). + // With dedup: should be 6000 (single decay). + if got != 6000 { + t.Errorf("expected vid2size=6000 (single decay), got %d (double decay would give 4500)", got) + } +} + +func TestShouldGrowVolumesByDcAndRack_WithPendingSize(t *testing.T) { + layout := ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "ip":"10.0.0.1", + "volumes":[ + {"id":1, "size":8500, "replication":"000"} + ], + "limit":10 + } + } + } +} +` + _, vl := setupPickTest(t, layout,10000) + + writables := vl.CloneWritableVolumes() + if vl.ShouldGrowVolumesByDcAndRack(&writables, "dc1", "rack1") { + t.Error("should not grow before pending makes volume crowded") + } + + // Add pending that pushes effective size past 9000 threshold + vl.RecordAssign(1, 600) + + if !vl.ShouldGrowVolumesByDcAndRack(&writables, "dc1", "rack1") { + t.Error("should grow after pending pushes volume past crowded threshold") + } +} +