From 3087da07db7796b2467757cfd2994cb73e6ced44 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 16:17:46 -0700 Subject: [PATCH] metrics with generation --- weed/stats/metrics.go | 23 ++++++++- weed/storage/erasure_coding/ec_shard.go | 9 ++-- weed/storage/store.go | 4 +- weed/storage/store_ec.go | 18 +++++-- weed/storage/volume.go | 2 +- weed/storage/volume_loading.go | 2 +- weed/storage/volume_vacuum.go | 2 +- weed/topology/topology_ec.go | 66 ++++++++++++++++++++++++ weed/topology/topology_event_handling.go | 2 + 9 files changed, 114 insertions(+), 14 deletions(-) diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 2723e253f..8d08a966b 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -244,7 +244,7 @@ var ( Subsystem: "volumeServer", Name: "volumes", Help: "Number of volumes or shards.", - }, []string{"collection", "type"}) + }, []string{"collection", "type", "generation"}) VolumeServerReadOnlyVolumeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -268,7 +268,7 @@ var ( Subsystem: "volumeServer", Name: "total_disk_size", Help: "Actual disk size used by volumes.", - }, []string{"collection", "type"}) + }, []string{"collection", "type", "generation"}) VolumeServerResourceGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -278,6 +278,23 @@ var ( Help: "Resource usage", }, []string{"name", "type"}) + // EC-specific generation metrics + MasterEcVolumeGenerationGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "master", + Name: "ec_volume_generations", + Help: "Number of EC volumes by generation and activity status.", + }, []string{"collection", "generation", "active"}) + + MasterEcShardGenerationGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "master", + Name: "ec_shard_generations", + Help: "Number of EC shards by generation and activity status.", + }, []string{"collection", "generation", "active"}) + VolumeServerConcurrentDownloadLimit = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: Namespace, @@ -395,6 +412,8 @@ func init() { Gather.MustRegister(MasterVolumeLayoutCrowded) Gather.MustRegister(MasterPickForWriteErrorCounter) Gather.MustRegister(MasterBroadcastToFullErrorCounter) + Gather.MustRegister(MasterEcVolumeGenerationGauge) + Gather.MustRegister(MasterEcShardGenerationGauge) Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerHandlerCounter) diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 58b878da8..8b23495ef 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -23,11 +23,12 @@ type EcVolumeShard struct { ecdFile *os.File ecdFileSize int64 DiskType types.DiskType + Generation uint32 // generation for metrics labeling } func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId, generation uint32) (v *EcVolumeShard, e error) { - v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType} + v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType, Generation: generation} baseFileName := v.FileNameWithGeneration(generation) @@ -51,11 +52,13 @@ func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string } func (shard *EcVolumeShard) Mount() { - stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards").Inc() + generationLabel := fmt.Sprintf("%d", shard.Generation) + stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards", generationLabel).Inc() } func (shard *EcVolumeShard) Unmount() { - stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards").Dec() + generationLabel := fmt.Sprintf("%d", shard.Generation) + stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards", generationLabel).Dec() } func (shard *EcVolumeShard) Size() int64 { diff --git a/weed/storage/store.go b/weed/storage/store.go index 2d9707571..7008c4995 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -359,11 +359,11 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } for col, size := range collectionVolumeSize { - stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) + stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal", "0").Set(float64(size)) } for col, deletedBytes := range collectionVolumeDeletedBytes { - stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes").Set(float64(deletedBytes)) + stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes", "0").Set(float64(deletedBytes)) } for col, types := range collectionVolumeReadOnlyCount { diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 39da17a80..6aa6cb6cf 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -24,21 +24,31 @@ import ( func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat { var ecShardMessages []*master_pb.VolumeEcShardInformationMessage - collectionEcShardSize := make(map[string]int64) + // Track sizes by collection+generation combination + collectionGenerationEcShardSize := make(map[string]map[uint32]int64) for diskId, location := range s.Locations { location.ecVolumesLock.RLock() for _, ecShards := range location.ecVolumes { ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage(uint32(diskId))...) + // Initialize collection map if needed + if collectionGenerationEcShardSize[ecShards.Collection] == nil { + collectionGenerationEcShardSize[ecShards.Collection] = make(map[uint32]int64) + } + for _, ecShard := range ecShards.Shards { - collectionEcShardSize[ecShards.Collection] += ecShard.Size() + collectionGenerationEcShardSize[ecShards.Collection][ecShards.Generation] += ecShard.Size() } } location.ecVolumesLock.RUnlock() } - for col, size := range collectionEcShardSize { - stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size)) + // Update metrics with generation labels + for col, generationSizes := range collectionGenerationEcShardSize { + for generation, size := range generationSizes { + generationLabel := fmt.Sprintf("%d", generation) + stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec", generationLabel).Set(float64(size)) + } } return &master_pb.Heartbeat{ diff --git a/weed/storage/volume.go b/weed/storage/volume.go index dd8ecbdce..1253bafc2 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -248,7 +248,7 @@ func (v *Volume) doClose() { glog.Warningf("Volume Close fail to sync volume %d", v.Id) } v.DataBackend = nil - stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec() } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 471401c6f..88fbab1f0 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -216,7 +216,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } - stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Inc() if err == nil { hasLoadedVolume = true diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 1d6cdf9e0..cea9b0dac 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -136,7 +136,7 @@ func (v *Volume) CommitCompact() error { } } v.DataBackend = nil - stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume", "0").Dec() var e error if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil { diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 570c7b062..7fa751e5e 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -348,6 +349,71 @@ func (t *Topology) LookupEcShardsWithFallback(vid needle.VolumeId, requestedGene return nil, 0, false } +// UpdateEcGenerationMetrics updates prometheus metrics with current EC volume generation information +func (t *Topology) UpdateEcGenerationMetrics() { + t.ecShardMapLock.RLock() + defer t.ecShardMapLock.RUnlock() + + t.ecActiveGenerationMapLock.RLock() + defer t.ecActiveGenerationMapLock.RUnlock() + + // Count volumes and shards by collection, generation, and active status + volumeCountsByCollection := make(map[string]map[uint32]map[bool]int) + shardCountsByCollection := make(map[string]map[uint32]map[bool]int) + + // Initialize counting maps + for key, ecShardLocs := range t.ecShardMap { + collection := ecShardLocs.Collection + generation := key.Generation + + if volumeCountsByCollection[collection] == nil { + volumeCountsByCollection[collection] = make(map[uint32]map[bool]int) + } + if volumeCountsByCollection[collection][generation] == nil { + volumeCountsByCollection[collection][generation] = make(map[bool]int) + } + if shardCountsByCollection[collection] == nil { + shardCountsByCollection[collection] = make(map[uint32]map[bool]int) + } + if shardCountsByCollection[collection][generation] == nil { + shardCountsByCollection[collection][generation] = make(map[bool]int) + } + + // Check if this generation is active for this volume + activeGeneration, hasActiveGen := t.ecActiveGenerationMap[key.VolumeId] + isActive := hasActiveGen && activeGeneration == generation + + // Count this volume + volumeCountsByCollection[collection][generation][isActive]++ + + // Count shards in this volume + shardCount := len(ecShardLocs.Locations) + shardCountsByCollection[collection][generation][isActive] += shardCount + } + + // Update volume metrics + for collection, generationMap := range volumeCountsByCollection { + for generation, activeMap := range generationMap { + generationLabel := fmt.Sprintf("%d", generation) + for isActive, count := range activeMap { + activeLabel := fmt.Sprintf("%t", isActive) + stats.MasterEcVolumeGenerationGauge.WithLabelValues(collection, generationLabel, activeLabel).Set(float64(count)) + } + } + } + + // Update shard metrics + for collection, generationMap := range shardCountsByCollection { + for generation, activeMap := range generationMap { + generationLabel := fmt.Sprintf("%d", generation) + for isActive, count := range activeMap { + activeLabel := fmt.Sprintf("%t", isActive) + stats.MasterEcShardGenerationGauge.WithLabelValues(collection, generationLabel, activeLabel).Set(float64(count)) + } + } + } +} + // ValidateEcGenerationReadiness checks if an EC generation has sufficient shards for activation // Returns true if the generation has at least erasure_coding.DataShardsCount shards available func (t *Topology) ValidateEcGenerationReadiness(vid needle.VolumeId, generation uint32) (ready bool, availableShards int, err error) { diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index e3ad8f2dc..1d87dfeef 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -29,6 +29,8 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g if !t.isDisableVacuum { t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate, true) } + // Update EC generation metrics periodically + t.UpdateEcGenerationMetrics() } else { stats.MasterReplicaPlacementMismatch.Reset() }