feat(master): size-aware volume assignment with weighted selection (#9031)

* feat(master): size-aware volume assignment with weighted selection

PickForWrite now selects volumes proportional to remaining capacity
instead of uniform random, so emptier volumes receive more writes.

- Add vid2size map to VolumeLayout tracking effective volume sizes
- Weighted pick via random sampling (k=3) for O(1) cost
- RecordAssign tracks estimated pending bytes between heartbeats
- Exponential decay on heartbeat: halve excess each cycle
- Proactive crowded detection using effective size
- Zero extra heap allocations on the unconstrained hot path

Benchmark (20 writable volumes, unconstrained):
  Before: 36 ns/op, 32 B/op, 2 allocs/op
  After:  85 ns/op, 32 B/op, 2 allocs/op

* fix: address review feedback on size-aware assignment

- RecordAssign: use write lock (Lock) instead of read lock (RLock)
  since it mutates vid2size map and crowded set
- RegisterVolume: clear crowded flag when heartbeat decay drops
  effective size below the threshold
- pickWeightedByRemaining: fix misleading Fisher-Yates comment,
  simplify to plain random sampling (duplicates are harmless)
- ShouldGrowVolumesByDcAndRack: read vid2size under RLock

* fix: decay once per heartbeat cycle, not per replica

RegisterVolume is called once per replica of a volume. For replicated
volumes, the pending size decay was running multiple times per heartbeat
cycle, reducing the excess by 75% instead of 50% (for 2 replicas).

Fix: track vid2reportedSize and only run decay when the heartbeat-
reported size actually changes. A second replica reporting the same
size in the same cycle is a no-op.

Also fix CodeQL alert: cap count*EstimatedNeedleSizeBytes to avoid
uint64→int64 overflow in RecordAssign call.

* Potential fix for pull request finding 'CodeQL / Incorrect conversion between integer types'

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>

* fix: fail fast in test setup on JSON errors

- setupWithLimit now takes testing.TB and calls t.Fatalf on unmarshal
  errors or type assertion failures instead of printing and continuing
- benchSetup removed; benchmarks reuse setupWithLimit directly

* fix: run size decay on every heartbeat, not just new volumes

RegisterVolume is only called for newly discovered volumes, not on
every heartbeat. The pending size decay was never running in production.

- Extract decay logic into UpdateVolumeSize(), called from
  SyncDataNodeRegistration for every reported volume on every heartbeat
- RegisterVolume only initializes vid2size for brand-new volumes
- Constrained PickForWrite: scan from random offset, collect up to
  pickSampleSize matches in a stack array (no append allocation)
- Tests now exercise UpdateVolumeSize directly instead of RegisterVolume
  to match the production heartbeat path

* fix: compute pending bytes in uint64 to satisfy CodeQL

---------

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
This commit is contained in:
Chris Lu
2026-04-11 09:19:05 -07:00
committed by GitHub
parent 388cc018ab
commit e2c79af6ec
4 changed files with 757 additions and 17 deletions

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand/v2"
"slices"
"sync"
@@ -319,6 +320,11 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
return next, nil
}
// EstimatedNeedleSizeBytes is the assumed size per assigned file ID, used to
// estimate pending bytes between heartbeats. Intentionally coarse — it only
// needs to spread load, not be precise.
const EstimatedNeedleSizeBytes = 1024 * 1024 // 1 MB
func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) {
var vid needle.VolumeId
vid, count, volumeLocationList, shouldGrow, err = volumeLayout.PickForWrite(requestedCount, option)
@@ -328,6 +334,10 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption,
if volumeLocationList == nil || volumeLocationList.Length() == 0 {
return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", NoWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
// Track estimated assigned bytes to spread load between heartbeats.
// Compute in uint64 and cap to avoid overflow on the int64 cast.
pendingBytes := min(uint64(count)*EstimatedNeedleSizeBytes, uint64(math.MaxInt64))
volumeLayout.RecordAssign(vid, int64(pendingBytes))
nextFileId := t.Sequence.NextFileId(requestedCount)
fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String()
return fileId, count, volumeLocationList, shouldGrow, nil
@@ -485,6 +495,15 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
vl.EnsureCorrectWritables(&v)
}
// Update effective sizes for all reported volumes (decay pending estimates)
for _, v := range volumeInfos {
if v.ReplicaPlacement == nil {
continue
}
diskType := types.ToDiskType(v.DiskType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
vl.UpdateVolumeSize(v.Id, v.Size)
}
return
}

View File

@@ -116,7 +116,9 @@ type VolumeLayout struct {
vacuumedVolumes map[needle.VolumeId]time.Time
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
accessLock sync.RWMutex
vid2size map[needle.VolumeId]uint64 // effective size: reported + pending
vid2reportedSize map[needle.VolumeId]uint64 // last heartbeat-reported size (dedup replicas)
}
type VolumeLayoutStats struct {
@@ -138,6 +140,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
vacuumedVolumes: make(map[needle.VolumeId]time.Time),
volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin,
vid2size: make(map[needle.VolumeId]uint64),
vid2reportedSize: make(map[needle.VolumeId]uint64),
}
}
@@ -155,6 +159,11 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
vl.vid2location[v.Id].Set(dn)
// For new volumes, initialize vid2size from reported size.
if _, exists := vl.vid2size[v.Id]; !exists {
vl.vid2size[v.Id] = v.Size
vl.vid2reportedSize[v.Id] = v.Size
}
// glog.V(4).Infof("volume %d added to %s len %d copy %d", v.Id, dn.Id(), vl.vid2location[v.Id].Length(), v.ReplicaPlacement.GetCopyCount())
for _, dn := range vl.vid2location[v.Id].list {
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
@@ -184,6 +193,30 @@ func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataN
}
}
// UpdateVolumeSize is called on every heartbeat for every reported volume.
// It decays the pending size estimate toward the reported size and updates
// crowded state. Replicated volumes report from multiple DataNodes; decay
// runs only once per new reported size to avoid double-halving.
func (vl *VolumeLayout) UpdateVolumeSize(vid needle.VolumeId, reportedSize uint64) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if reportedSize == vl.vid2reportedSize[vid] {
return // same size from another replica in this cycle
}
vl.vid2reportedSize[vid] = reportedSize
if prev := vl.vid2size[vid]; prev > reportedSize {
vl.vid2size[vid] = reportedSize + (prev-reportedSize)/2
} else {
vl.vid2size[vid] = reportedSize
}
if float64(vl.vid2size[vid]) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
vl.setVolumeCrowded(vid)
} else {
vl.removeFromCrowded(vid)
}
}
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -202,6 +235,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if location.Length() == 0 {
delete(vl.vid2location, v.Id)
delete(vl.vid2size, v.Id)
delete(vl.vid2reportedSize, v.Id)
vl.removeFromCrowded(v.Id)
}
@@ -260,6 +295,20 @@ func (vl *VolumeLayout) isCrowdedVolume(v *storage.VolumeInfo) bool {
return float64(v.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold
}
// RecordAssign adds the estimated byte size to the volume's tracked effective
// size and marks it crowded if it crosses the threshold.
func (vl *VolumeLayout) RecordAssign(vid needle.VolumeId, pendingDelta int64) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if pendingDelta > 0 {
vl.vid2size[vid] += uint64(pendingDelta)
}
if float64(vl.vid2size[vid]) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
vl.setVolumeCrowded(vid)
}
}
func (vl *VolumeLayout) isEmpty() bool {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
@@ -296,23 +345,20 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
return 0, 0, nil, true, fmt.Errorf("%s", NoWritableVolumes)
}
if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
vid := vl.writables[rand.IntN(lenWriters)]
locationList = vl.vid2location[vid]
vid, locationList = vl.pickWeightedByRemaining(vl.writables)
if locationList == nil || len(locationList.list) == 0 {
return 0, 0, nil, false, fmt.Errorf("Strangely vid %s is on no machine!", vid.String())
}
return vid, count, locationList.Copy(), false, nil
}
// clone vl.writables
writables := make([]needle.VolumeId, len(vl.writables))
copy(writables, vl.writables)
// randomize the writables
rand.Shuffle(len(writables), func(i, j int) {
writables[i], writables[j] = writables[j], writables[i]
})
for _, writableVolumeId := range writables {
// Scan from a random offset to collect up to pickSampleSize matching
// candidates, avoiding a full scan + allocation in the common case.
var sample [pickSampleSize]needle.VolumeId
found := 0
start := rand.IntN(lenWriters)
for i := 0; i < lenWriters && found < pickSampleSize; i++ {
writableVolumeId := vl.writables[(start+i)%lenWriters]
volumeLocationList := vl.vid2location[writableVolumeId]
for _, dn := range volumeLocationList.list {
if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) {
@@ -324,11 +370,75 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
continue
}
vid, locationList, counter = writableVolumeId, volumeLocationList.Copy(), count
return
sample[found] = writableVolumeId
found++
break
}
}
return vid, count, locationList, true, fmt.Errorf("%s in DataCenter:%v Rack:%v DataNode:%v", NoWritableVolumes, option.DataCenter, option.Rack, option.DataNode)
if found == 0 {
return vid, count, locationList, true, fmt.Errorf("%s in DataCenter:%v Rack:%v DataNode:%v", NoWritableVolumes, option.DataCenter, option.Rack, option.DataNode)
}
vid, locationList = vl.weightedPick(sample[:found])
return vid, count, locationList.Copy(), false, nil
}
// pickSampleSize is how many random candidates to sample before doing a
// weighted pick. Keeps cost O(1) regardless of total writable volume count
// while still biasing toward emptier volumes.
const pickSampleSize = 3
// pickWeightedByRemaining randomly samples a few candidates from the list,
// then does a weighted pick among them by remaining capacity.
// Sampled candidates may repeat when len(candidates) is small relative to
// pickSampleSize; this is harmless — a repeated volume just gets proportionally
// more weight, which is a negligible statistical effect.
func (vl *VolumeLayout) pickWeightedByRemaining(candidates []needle.VolumeId) (needle.VolumeId, *VolumeLocationList) {
n := len(candidates)
if n <= pickSampleSize {
return vl.weightedPick(candidates)
}
var sample [pickSampleSize]needle.VolumeId
for i := range sample {
sample[i] = candidates[rand.IntN(n)]
}
return vl.weightedPick(sample[:])
}
func (vl *VolumeLayout) weightedPick(candidates []needle.VolumeId) (needle.VolumeId, *VolumeLocationList) {
if len(candidates) == 1 {
vid := candidates[0]
return vid, vl.vid2location[vid]
}
// first pass: sum weights
var totalRemaining uint64
for _, vid := range candidates {
totalRemaining += vl.remainingSize(vid)
}
// second pass: weighted random pick
pick := rand.Uint64N(totalRemaining)
var cumulative uint64
for _, vid := range candidates {
cumulative += vl.remainingSize(vid)
if pick < cumulative {
return vid, vl.vid2location[vid]
}
}
vid := candidates[0]
return vid, vl.vid2location[vid]
}
func (vl *VolumeLayout) remainingSize(vid needle.VolumeId) uint64 {
size := vl.vid2size[vid]
if size < vl.volumeSizeLimit {
if r := vl.volumeSizeLimit - size; r > 1 {
return r
}
}
return 1
}
func (vl *VolumeLayout) HasGrowRequest() bool {
@@ -372,8 +482,13 @@ func (vl *VolumeLayout) ShouldGrowVolumesByDcAndRack(writables *[]needle.VolumeI
if !checkDcOnly && dn.GetRack().Id() != rackId {
continue
}
if info, err := dn.GetVolumesById(v); err == nil && !vl.isCrowdedVolume(&info) {
return false
if _, err := dn.GetVolumesById(v); err == nil {
vl.accessLock.RLock()
size := vl.vid2size[v]
vl.accessLock.RUnlock()
if float64(size) <= float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
return false
}
}
}
}

View File

@@ -0,0 +1,134 @@
package topology
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
var benchLayoutSmall = `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":2000, "replication":"000"},
{"id":2, "size":5000, "replication":"000"},
{"id":3, "size":8000, "replication":"000"}
],
"limit":10
}
}
}
}
`
var benchLayoutMedium = `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":1000, "replication":"000"},
{"id":2, "size":2000, "replication":"000"},
{"id":3, "size":3000, "replication":"000"},
{"id":4, "size":4000, "replication":"000"},
{"id":5, "size":5000, "replication":"000"},
{"id":6, "size":6000, "replication":"000"},
{"id":7, "size":7000, "replication":"000"},
{"id":8, "size":8000, "replication":"000"},
{"id":9, "size":9000, "replication":"000"},
{"id":10, "size":9500, "replication":"000"}
],
"limit":20
}
}
}
}
`
var benchLayoutLarge = `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":500, "replication":"000"},
{"id":2, "size":1000, "replication":"000"},
{"id":3, "size":1500, "replication":"000"},
{"id":4, "size":2000, "replication":"000"},
{"id":5, "size":2500, "replication":"000"},
{"id":6, "size":3000, "replication":"000"},
{"id":7, "size":3500, "replication":"000"},
{"id":8, "size":4000, "replication":"000"},
{"id":9, "size":4500, "replication":"000"},
{"id":10, "size":5000, "replication":"000"},
{"id":11, "size":5500, "replication":"000"},
{"id":12, "size":6000, "replication":"000"},
{"id":13, "size":6500, "replication":"000"},
{"id":14, "size":7000, "replication":"000"},
{"id":15, "size":7500, "replication":"000"},
{"id":16, "size":8000, "replication":"000"},
{"id":17, "size":8500, "replication":"000"},
{"id":18, "size":9000, "replication":"000"},
{"id":19, "size":9200, "replication":"000"},
{"id":20, "size":9500, "replication":"000"}
],
"limit":30
}
}
}
}
`
func benchPickForWrite(b *testing.B, layout string, volumeSizeLimit uint64) {
topo := setupWithLimit(b, layout, volumeSizeLimit)
rp, _ := super_block.NewReplicaPlacementFromString("000")
vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
option := &VolumeGrowOption{}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
vl.PickForWrite(1, option)
}
}
func BenchmarkPickForWrite_3Volumes(b *testing.B) {
benchPickForWrite(b, benchLayoutSmall, 10000)
}
func BenchmarkPickForWrite_10Volumes(b *testing.B) {
benchPickForWrite(b, benchLayoutMedium, 10000)
}
func BenchmarkPickForWrite_20Volumes(b *testing.B) {
benchPickForWrite(b, benchLayoutLarge, 10000)
}
func benchPickForWriteConstrained(b *testing.B, layout string, volumeSizeLimit uint64) {
topo := setupWithLimit(b, layout, volumeSizeLimit)
rp, _ := super_block.NewReplicaPlacementFromString("000")
vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
option := &VolumeGrowOption{DataCenter: "dc1"}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
vl.PickForWrite(1, option)
}
}
func BenchmarkPickForWriteConstrained_3Volumes(b *testing.B) {
benchPickForWriteConstrained(b, benchLayoutSmall, 10000)
}
func BenchmarkPickForWriteConstrained_10Volumes(b *testing.B) {
benchPickForWriteConstrained(b, benchLayoutMedium, 10000)
}
func BenchmarkPickForWriteConstrained_20Volumes(b *testing.B) {
benchPickForWriteConstrained(b, benchLayoutLarge, 10000)
}

View File

@@ -0,0 +1,472 @@
package topology
import (
"encoding/json"
"math"
"testing"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
// setupWithLimit is like setup() but allows specifying the volumeSizeLimit
// so that VolumeLayouts are created with the correct limit from the start.
func setupWithLimit(t testing.TB, topologyLayout string, volumeSizeLimit uint64) *Topology {
t.Helper()
var data interface{}
if err := json.Unmarshal([]byte(topologyLayout), &data); err != nil {
t.Fatalf("setupWithLimit: json.Unmarshal: %v", err)
}
mTopology, ok := data.(map[string]interface{})
if !ok {
t.Fatalf("setupWithLimit: expected map[string]interface{}, got %T", data)
}
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), volumeSizeLimit, 5, false)
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)
dcMap := dcValue.(map[string]interface{})
topo.LinkChildNode(dc)
for rackKey, rackValue := range dcMap {
dcRack := NewRack(rackKey)
rackMap := rackValue.(map[string]interface{})
dc.LinkChildNode(dcRack)
for serverKey, serverValue := range rackMap {
server := NewDataNode(serverKey)
serverMap := serverValue.(map[string]interface{})
if ip, ok := serverMap["ip"]; ok {
server.Ip = ip.(string)
}
dcRack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
Id: needle.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)),
Version: needle.GetCurrentVersion(),
}
if mVal, ok := m["collection"]; ok {
vi.Collection = mVal.(string)
}
if mVal, ok := m["replication"]; ok {
rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string))
vi.ReplicaPlacement = rp
}
if vi.ReplicaPlacement != nil {
vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType)
vl.RegisterVolume(&vi, server)
vl.setVolumeWritable(vi.Id)
}
server.AddOrUpdateVolume(vi)
}
disk := server.getOrCreateDisk("")
disk.UpAdjustDiskUsageDelta("", &DiskUsageCounts{
maxVolumeCount: int64(serverMap["limit"].(float64)),
})
}
}
}
return topo
}
func setupPickTest(t testing.TB, layout string, volumeSizeLimit uint64) (*Topology, *VolumeLayout) {
t.Helper()
topo := setupWithLimit(t, layout, volumeSizeLimit)
rp, _ := super_block.NewReplicaPlacementFromString("000")
vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
return topo, vl
}
func TestPickForWriteWeightedDistribution(t *testing.T) {
// 3 volumes at 20%, 50%, 80% full (sizes 2000, 5000, 8000 of limit 10000)
// remaining: 8000, 5000, 2000 => ratios ~53%, 33%, 13%
layout := `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":2000, "replication":"000"},
{"id":2, "size":5000, "replication":"000"},
{"id":3, "size":8000, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
counts := make(map[needle.VolumeId]int)
option := &VolumeGrowOption{}
n := 60000
for i := 0; i < n; i++ {
vid, _, _, _, err := vl.PickForWrite(1, option)
if err != nil {
t.Fatalf("PickForWrite: %v", err)
}
counts[vid]++
}
// vid 1 (remaining 8000) > vid 2 (remaining 5000) > vid 3 (remaining 2000)
if counts[1] <= counts[3] {
t.Errorf("expected vid 1 picked more than vid 3: vid1=%d, vid3=%d", counts[1], counts[3])
}
if counts[2] <= counts[3] {
t.Errorf("expected vid 2 picked more than vid 3: vid2=%d, vid3=%d", counts[2], counts[3])
}
// Check proportions: expected 8000/5000/2000 out of 15000
expected := map[needle.VolumeId]float64{
1: 8000.0 / 15000.0,
2: 5000.0 / 15000.0,
3: 2000.0 / 15000.0,
}
for vid, expectedPct := range expected {
actualPct := float64(counts[vid]) / float64(n)
if math.Abs(actualPct-expectedPct) > 0.03 {
t.Errorf("vid %d: expected ~%.1f%%, got %.1f%%", vid, expectedPct*100, actualPct*100)
}
}
}
func TestPickForWriteWithPendingSize(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":1000, "replication":"000"},
{"id":2, "size":1000, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
// Add large pending to vid 1, making it effectively 9000/10000
vl.RecordAssign(1, 8000)
counts := make(map[needle.VolumeId]int)
option := &VolumeGrowOption{}
n := 10000
for i := 0; i < n; i++ {
vid, _, _, _, err := vl.PickForWrite(1, option)
if err != nil {
t.Fatalf("PickForWrite: %v", err)
}
counts[vid]++
}
// vid 2 (remaining ~9000) should be picked much more than vid 1 (remaining ~1000)
ratio := float64(counts[2]) / float64(counts[1])
if ratio < 3.0 {
t.Errorf("vid2/vid1 ratio %.2f expected >= 3.0 (vid1=%d, vid2=%d)", ratio, counts[1], counts[2])
}
}
func TestPickForWriteSingleWritable(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":5000, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
option := &VolumeGrowOption{}
for i := 0; i < 100; i++ {
vid, _, _, _, err := vl.PickForWrite(1, option)
if err != nil {
t.Fatalf("PickForWrite: %v", err)
}
if vid != 1 {
t.Fatalf("expected vid 1, got %d", vid)
}
}
}
func TestPickForWriteAllNearFull(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":9999, "replication":"000"},
{"id":2, "size":9999, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
option := &VolumeGrowOption{}
for i := 0; i < 100; i++ {
vid, _, _, _, err := vl.PickForWrite(1, option)
if err != nil {
t.Fatalf("PickForWrite: %v", err)
}
if vid != 1 && vid != 2 {
t.Fatalf("expected vid 1 or 2, got %d", vid)
}
}
}
func TestPickForWriteConstrainedWeighted(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"ip":"10.0.0.1",
"volumes":[
{"id":1, "size":2000, "replication":"000"},
{"id":2, "size":8000, "replication":"000"}
],
"limit":10
}
},
"rack2":{
"server2":{
"ip":"10.0.0.2",
"volumes":[
{"id":3, "size":5000, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
counts := make(map[needle.VolumeId]int)
option := &VolumeGrowOption{DataCenter: "dc1"}
n := 20000
for i := 0; i < n; i++ {
vid, _, _, _, err := vl.PickForWrite(1, option)
if err != nil {
t.Fatalf("PickForWrite: %v", err)
}
counts[vid]++
}
// vid 1 (remaining 8000) should be picked most, vid 2 (remaining 2000) least
if counts[1] <= counts[2] {
t.Errorf("expected vid 1 picked more than vid 2: vid1=%d, vid2=%d", counts[1], counts[2])
}
}
func TestRecordAssignMarksCrowded(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":8500, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
// Volume at 85% — not crowded yet (threshold is 90%)
_, crowded := vl.GetWritableVolumeCount()
if crowded != 0 {
t.Fatalf("expected 0 crowded, got %d", crowded)
}
// Add pending that pushes past 90%
vl.RecordAssign(1, 1000)
_, crowded = vl.GetWritableVolumeCount()
if crowded != 1 {
t.Errorf("expected 1 crowded after pending push past 90%%, got %d", crowded)
}
}
func TestHeartbeatDecaysPendingSize(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"volumes":[
{"id":1, "size":1000, "replication":"000"},
{"id":2, "size":1000, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
// vid2size starts at 1000 (reported). Add 8000 pending → 9000.
vl.RecordAssign(1, 8000)
vl.accessLock.RLock()
if vl.vid2size[1] != 9000 {
t.Fatalf("expected vid2size=9000 after RecordAssign, got %d", vl.vid2size[1])
}
vl.accessLock.RUnlock()
// Heartbeat: volume server reports size=3000 (some writes landed).
// Old effective=9000, new reported=3000 → excess=6000 → decayed to 3000.
// So vid2size should become 3000 + 6000/2 = 6000, not just 3000.
vl.UpdateVolumeSize(1, 3000)
vl.accessLock.RLock()
if vl.vid2size[1] != 6000 {
t.Errorf("expected vid2size=6000 after decay (3000 + 6000/2), got %d", vl.vid2size[1])
}
vl.accessLock.RUnlock()
// Second heartbeat: size=5000. Old effective=6000 → excess=1000 → decay to 500.
// vid2size should become 5000 + 1000/2 = 5500.
vl.UpdateVolumeSize(1, 5000)
vl.accessLock.RLock()
if vl.vid2size[1] != 5500 {
t.Errorf("expected vid2size=5500 after second decay (5000 + 1000/2), got %d", vl.vid2size[1])
}
vl.accessLock.RUnlock()
// Third heartbeat: size=5500. Old effective=5500 → no excess.
// vid2size should be exactly 5500.
vl.UpdateVolumeSize(1, 5500)
vl.accessLock.RLock()
if vl.vid2size[1] != 5500 {
t.Errorf("expected vid2size=5500 (no excess), got %d", vl.vid2size[1])
}
vl.accessLock.RUnlock()
// vid 2 (remaining 9000) should be picked more than vid 1 (remaining 4500)
counts := make(map[needle.VolumeId]int)
option := &VolumeGrowOption{}
for i := 0; i < 10000; i++ {
vid, _, _, _, err := vl.PickForWrite(1, option)
if err != nil {
t.Fatalf("PickForWrite: %v", err)
}
counts[vid]++
}
if counts[2] <= counts[1] {
t.Errorf("vid 2 (remaining 9000) should be picked more than vid 1 (remaining 4500): vid1=%d, vid2=%d", counts[1], counts[2])
}
}
func TestHeartbeatDecayDedupReplicas(t *testing.T) {
// Volume 1 replicated on server1 and server2.
// Both servers report size=3000 in the same heartbeat cycle.
// Decay should run only once, not once per replica.
layout := `
{
"dc1":{
"rack1":{
"server1":{
"ip":"10.0.0.1",
"volumes":[
{"id":1, "size":1000, "replication":"001"}
],
"limit":10
},
"server2":{
"ip":"10.0.0.2",
"volumes":[
{"id":1, "size":1000, "replication":"001"}
],
"limit":10
}
}
}
}
`
topo := setupWithLimit(t, layout, 10000)
rp, _ := super_block.NewReplicaPlacementFromString("001")
vl := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
// Add pending: effective = 1000 + 8000 = 9000
vl.RecordAssign(1, 8000)
vl.accessLock.RLock()
if vl.vid2size[1] != 9000 {
t.Fatalf("expected vid2size=9000, got %d", vl.vid2size[1])
}
vl.accessLock.RUnlock()
// Both replicas report size=3000. Decay should happen once: 3000 + (9000-3000)/2 = 6000.
// Calling UpdateVolumeSize twice simulates two replicas reporting in the same cycle.
vl.UpdateVolumeSize(1, 3000)
vl.UpdateVolumeSize(1, 3000) // second replica, same size — should be a no-op
vl.accessLock.RLock()
got := vl.vid2size[1]
vl.accessLock.RUnlock()
// Without dedup: would be 3000 + (6000-3000)/2 = 4500 (double decay).
// With dedup: should be 6000 (single decay).
if got != 6000 {
t.Errorf("expected vid2size=6000 (single decay), got %d (double decay would give 4500)", got)
}
}
func TestShouldGrowVolumesByDcAndRack_WithPendingSize(t *testing.T) {
layout := `
{
"dc1":{
"rack1":{
"server1":{
"ip":"10.0.0.1",
"volumes":[
{"id":1, "size":8500, "replication":"000"}
],
"limit":10
}
}
}
}
`
_, vl := setupPickTest(t, layout,10000)
writables := vl.CloneWritableVolumes()
if vl.ShouldGrowVolumesByDcAndRack(&writables, "dc1", "rack1") {
t.Error("should not grow before pending makes volume crowded")
}
// Add pending that pushes effective size past 9000 threshold
vl.RecordAssign(1, 600)
if !vl.ShouldGrowVolumesByDcAndRack(&writables, "dc1", "rack1") {
t.Error("should grow after pending pushes volume past crowded threshold")
}
}