From 25beb7ec48a78330f847be02479463012d9ef4d3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 May 2026 14:09:02 -0700 Subject: [PATCH] admin: expose Prometheus metrics (#9652) * admin: add -metricsPort flag to expose Prometheus metrics The admin command had no metrics endpoint, so passing -metricsPort (as the operator does for spec.admin.metricsPort) crashed the process with "flag provided but not defined". Wire up -metricsPort/-metricsIp and start the shared Prometheus metrics server, matching filer, master, and volume. * admin: emit maintenance task and worker fleet metrics Add Prometheus metrics for the admin server's distinctive work: the maintenance task queue and the worker fleet that executes it. Task lifecycle: maintenance_tasks_by_status / _by_type gauges (snapshot of the queue), maintenance_tasks_completed_total{type,outcome} counter and maintenance_task_duration_seconds{type} histogram (recorded when a task reaches a terminal state), and last/next scan timestamp gauges. Worker fleet: workers_connected and worker_slots{used,max} gauges, plus worker_events_total{event} counting register/unregister/stale removals. Gauges are snapshotted by a background goroutine on the admin server; counters and the histogram are recorded at their event sites. * admin: read worker slot totals under lock, clear next-scan gauge when idle GetWorkers returns live worker pointers; summing CurrentLoad/MaxConcurrent outside the queue lock races with task assignment and completion. Add GetWorkerSlotTotals to aggregate under the lock. Also reset maintenance_next_scan_timestamp_seconds to 0 when the scanner is not running, so it can't retain a stale value after a stop. --- weed/admin/dash/admin_server.go | 52 ++++++++++++ weed/admin/dash/worker_grpc_server.go | 15 ++-- weed/admin/maintenance/maintenance_manager.go | 7 ++ weed/admin/maintenance/maintenance_queue.go | 31 ++++++- weed/command/admin.go | 17 ++++ weed/stats/metrics.go | 83 +++++++++++++++++++ 6 files changed, 198 insertions(+), 7 deletions(-) diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 4b2eed840..b02e869f8 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -23,6 +23,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/security" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/util" @@ -280,6 +281,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, go server.monitorVacuumWorker(bgCtx) } + go server.publishMaintenanceMetrics(bgCtx) + return server } @@ -364,6 +367,55 @@ func (s *AdminServer) monitorVacuumWorker(ctx context.Context) { } } +// publishMaintenanceMetrics periodically snapshots the maintenance queue and +// worker fleet into Prometheus gauges. Counters and durations are recorded at +// their event sites; these gauges reflect current state at scrape resolution. +func (s *AdminServer) publishMaintenanceMetrics(ctx context.Context) { + const interval = 15 * time.Second + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.collectMaintenanceMetrics() + } + } +} + +func (s *AdminServer) collectMaintenanceMetrics() { + if s.maintenanceManager == nil { + return + } + + stats := s.maintenanceManager.GetStats() + + stats_collect.AdminMaintenanceTasksByStatus.Reset() + for status, count := range stats.TasksByStatus { + stats_collect.AdminMaintenanceTasksByStatus.WithLabelValues(string(status)).Set(float64(count)) + } + + stats_collect.AdminMaintenanceTasksByType.Reset() + for taskType, count := range stats.TasksByType { + stats_collect.AdminMaintenanceTasksByType.WithLabelValues(string(taskType)).Set(float64(count)) + } + + // NextScanTime is only meaningful while the scanner runs; GetStats computes + // it unconditionally, so clear the gauge when idle to avoid a stale value. + if s.maintenanceManager.IsRunning() && !stats.NextScanTime.IsZero() { + stats_collect.AdminMaintenanceNextScanTimestampSeconds.Set(float64(stats.NextScanTime.Unix())) + } else { + stats_collect.AdminMaintenanceNextScanTimestampSeconds.Set(0) + } + + workers, usedSlots, maxSlots := s.maintenanceManager.GetWorkerSlotTotals() + stats_collect.AdminWorkersConnected.Set(float64(workers)) + stats_collect.AdminWorkerSlots.WithLabelValues("used").Set(float64(usedSlots)) + stats_collect.AdminWorkerSlots.WithLabelValues("max").Set(float64(maxSlots)) +} + // loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files func (s *AdminServer) loadTaskConfigurationsFromPersistence() { if s.configPersistence == nil || !s.configPersistence.IsConfigured() { diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index eaa69a803..62995e1f2 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -16,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/security" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -233,6 +234,7 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr } s.connections[workerID] = conn s.connMutex.Unlock() + stats_collect.AdminWorkerEventsTotal.WithLabelValues("registered").Inc() // Register worker with maintenance manager s.registerWorkerWithManager(conn) @@ -265,11 +267,11 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr select { case <-ctx.Done(): glog.Infof("Worker %s connection closed: %v", workerID, ctx.Err()) - s.unregisterWorker(conn) + s.unregisterWorker(conn, "unregistered") return nil case <-connCtx.Done(): glog.Infof("Worker %s connection cancelled", workerID) - s.unregisterWorker(conn) + s.unregisterWorker(conn, "unregistered") return nil default: } @@ -285,7 +287,7 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr default: glog.Errorf("Error receiving from worker %s: %v", workerID, err) } - s.unregisterWorker(conn) + s.unregisterWorker(conn, "unregistered") return err } @@ -338,7 +340,7 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work case *worker_pb.WorkerMessage_Shutdown: glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason) - s.unregisterWorker(conn) + s.unregisterWorker(conn, "unregistered") default: glog.Warningf("Unknown message type from worker %s", workerID) @@ -605,7 +607,7 @@ func (s *WorkerGrpcServer) safeCloseOutgoingChannel(conn *WorkerConnection, sour } // unregisterWorker removes a worker connection -func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection) { +func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection, event string) { s.connMutex.Lock() existingConn, exists := s.connections[conn.workerID] if !exists { @@ -624,6 +626,7 @@ func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection) { // Remove from map first to prevent duplicate cleanup attempts delete(s.connections, conn.workerID) s.connMutex.Unlock() + stats_collect.AdminWorkerEventsTotal.WithLabelValues(event).Inc() // Cancel context to signal goroutines to stop conn.cancel() @@ -665,7 +668,7 @@ func (s *WorkerGrpcServer) cleanupStaleConnections() { for _, conn := range toRemove { glog.Warningf("Cleaning up stale worker connection: %s", conn.workerID) - s.unregisterWorker(conn) + s.unregisterWorker(conn, "stale_removed") } } diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 257bafdd5..9aaf2ebfa 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" @@ -315,6 +316,7 @@ func (mm *MaintenanceManager) performScan() { glog.Infof("Starting maintenance scan...") results, err := mm.scanner.ScanForMaintenanceTasks() + stats_collect.AdminMaintenanceLastScanTimestampSeconds.SetToCurrentTime() if err != nil { // Handle scan error mm.mutex.Lock() @@ -518,6 +520,11 @@ func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker { return mm.queue.GetWorkers() } +// GetWorkerSlotTotals returns worker count and aggregate used/max task slots. +func (mm *MaintenanceManager) GetWorkerSlotTotals() (workers, used, max int) { + return mm.queue.GetWorkerSlotTotals() +} + // TriggerScan manually triggers a maintenance scan func (mm *MaintenanceManager) TriggerScan() error { return mm.triggerScanInternal(true) diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 4b0e29bce..1cec742b6 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -8,6 +8,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" ) // NewMaintenanceQueue creates a new maintenance queue @@ -482,12 +483,14 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { // Calculate task duration var duration time.Duration - if task.StartedAt != nil { + hadStart := task.StartedAt != nil + if hadStart { duration = completedTime.Sub(*task.StartedAt) } // Capture workerID before it may be cleared during retry originalWorkerID := task.WorkerID + taskType := string(task.Type) var taskToSave *MaintenanceTask var logFn func() @@ -577,6 +580,18 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } mq.mutex.Unlock() + // Record terminal-state metrics. A retry leaves the task pending, so it + // is not counted as completed or failed here. + switch taskStatus { + case TaskStatusCompleted: + stats_collect.AdminMaintenanceTasksCompletedTotal.WithLabelValues(taskType, "completed").Inc() + case TaskStatusFailed: + stats_collect.AdminMaintenanceTasksCompletedTotal.WithLabelValues(taskType, "failed").Inc() + } + if hadStart && (taskStatus == TaskStatusCompleted || taskStatus == TaskStatusFailed) { + stats_collect.AdminMaintenanceTaskDurationSeconds.WithLabelValues(taskType).Observe(duration.Seconds()) + } + // Only persist non-terminal tasks (retries). Completed/failed tasks stay // in memory for the UI but are not written to disk — they would just // accumulate and slow down future startups. @@ -819,6 +834,20 @@ func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker { return workers } +// GetWorkerSlotTotals aggregates worker count and used/max task slots under the +// lock, so callers don't read live worker fields that task updates mutate. +func (mq *MaintenanceQueue) GetWorkerSlotTotals() (workers, used, max int) { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + for _, worker := range mq.workers { + workers++ + used += worker.CurrentLoad + max += worker.MaxConcurrent + } + return +} + // generateTaskID generates a unique ID for tasks func generateTaskID() string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" diff --git a/weed/command/admin.go b/weed/command/admin.go index b6260dd6d..b90dd64c6 100644 --- a/weed/command/admin.go +++ b/weed/command/admin.go @@ -30,6 +30,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/grace" ) @@ -50,6 +51,8 @@ type AdminOptions struct { dataDir *string icebergPort *int urlPrefix *string + metricsHttpPort *int + metricsHttpIp *string debug *bool debugPort *int cpuProfile *string @@ -70,6 +73,8 @@ func init() { a.readOnlyPassword = cmdAdmin.Flag.String("readOnlyPassword", "", "read-only user password (optional, for view-only access; requires adminPassword to be set)") a.icebergPort = cmdAdmin.Flag.Int("iceberg.port", 8181, "Iceberg REST Catalog port (0 to hide in UI)") a.urlPrefix = cmdAdmin.Flag.String("urlPrefix", "", "URL path prefix when running behind a reverse proxy under a subdirectory (e.g. /seaweedfs)") + a.metricsHttpPort = cmdAdmin.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + a.metricsHttpIp = cmdAdmin.Flag.String("metricsIp", "", "metrics listen ip. If empty, listens on all interfaces.") a.debug = cmdAdmin.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") a.debugPort = cmdAdmin.Flag.Int("debug.port", 6060, "http port for debugging") a.cpuProfile = cmdAdmin.Flag.String("cpuprofile", "", "cpu profile output file") @@ -160,6 +165,12 @@ var cmdAdmin = &Command{ weed admin -debug -debug.port=6060 -master="localhost:9333" weed admin -cpuprofile=cpu.prof -memprofile=mem.prof -master="localhost:9333" + Metrics: + - Use -metricsPort to expose Prometheus metrics at http://:/metrics + - Use -metricsIp to bind the metrics endpoint to a specific ip (default: all interfaces) + - Metrics are disabled when -metricsPort is 0 (the default) + - Example: weed admin -metricsPort=9327 -master="localhost:9333" + Configuration File: - The security.toml file is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order @@ -257,6 +268,12 @@ func runAdmin(cmd *Command, args []string) bool { } fmt.Printf("Plugin: Enabled\n") + // Start Prometheus metrics endpoint if a port is configured + if *a.metricsHttpPort > 0 { + fmt.Printf("Metrics: http://%s/metrics\n", stats_collect.JoinHostPort(*a.metricsHttpIp, *a.metricsHttpPort)) + } + go stats_collect.StartMetricsServer(*a.metricsHttpIp, *a.metricsHttpPort) + // Set up graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 2d6b2d576..0d4aec244 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -643,6 +643,79 @@ var ( Name: "daily_run_last_walked_ns", Help: "Per-shard timestamp (UnixNano) of the most recent successful walker fire. 0 means the shard hasn't completed a walk yet.", }, []string{"shard"}) + + AdminMaintenanceTasksByStatus = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "maintenance_tasks_by_status", + Help: "Current number of maintenance tasks by status (pending, assigned, in_progress, completed, failed, cancelled).", + }, []string{"status"}) + + AdminMaintenanceTasksByType = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "maintenance_tasks_by_type", + Help: "Current number of maintenance tasks by type.", + }, []string{"type"}) + + AdminMaintenanceTasksCompletedTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "maintenance_tasks_completed_total", + Help: "Counter of maintenance tasks that reached a terminal state, by type and outcome (completed, failed).", + }, []string{"type", "outcome"}) + + AdminMaintenanceTaskDurationSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "maintenance_task_duration_seconds", + Help: "Execution time of maintenance tasks that reached a terminal state, by type.", + Buckets: prometheus.ExponentialBuckets(1, 2, 16), + }, []string{"type"}) + + AdminMaintenanceLastScanTimestampSeconds = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "maintenance_last_scan_timestamp_seconds", + Help: "Unix timestamp of the most recent maintenance scan. 0 means no scan has run yet.", + }) + + AdminMaintenanceNextScanTimestampSeconds = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "maintenance_next_scan_timestamp_seconds", + Help: "Unix timestamp of the next expected maintenance scan.", + }) + + AdminWorkersConnected = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "workers_connected", + Help: "Current number of maintenance workers known to the admin server.", + }) + + AdminWorkerSlots = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "worker_slots", + Help: "Maintenance worker task slots aggregated across workers, by state (used, max).", + }, []string{"state"}) + + AdminWorkerEventsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "admin", + Name: "worker_events_total", + Help: "Counter of maintenance worker lifecycle events by type (registered, unregistered, stale_removed).", + }, []string{"event"}) ) func init() { @@ -726,6 +799,16 @@ func init() { Gather.MustRegister(UploadErrorCounter) + Gather.MustRegister(AdminMaintenanceTasksByStatus) + Gather.MustRegister(AdminMaintenanceTasksByType) + Gather.MustRegister(AdminMaintenanceTasksCompletedTotal) + Gather.MustRegister(AdminMaintenanceTaskDurationSeconds) + Gather.MustRegister(AdminMaintenanceLastScanTimestampSeconds) + Gather.MustRegister(AdminMaintenanceNextScanTimestampSeconds) + Gather.MustRegister(AdminWorkersConnected) + Gather.MustRegister(AdminWorkerSlots) + Gather.MustRegister(AdminWorkerEventsTotal) + go bucketMetricTTLControl() }