admin: expose Prometheus metrics (#9652)

* admin: add -metricsPort flag to expose Prometheus metrics

The admin command had no metrics endpoint, so passing -metricsPort
(as the operator does for spec.admin.metricsPort) crashed the process
with "flag provided but not defined". Wire up -metricsPort/-metricsIp
and start the shared Prometheus metrics server, matching filer, master,
and volume.

* admin: emit maintenance task and worker fleet metrics

Add Prometheus metrics for the admin server's distinctive work: the
maintenance task queue and the worker fleet that executes it.

Task lifecycle: maintenance_tasks_by_status / _by_type gauges (snapshot
of the queue), maintenance_tasks_completed_total{type,outcome} counter
and maintenance_task_duration_seconds{type} histogram (recorded when a
task reaches a terminal state), and last/next scan timestamp gauges.

Worker fleet: workers_connected and worker_slots{used,max} gauges, plus
worker_events_total{event} counting register/unregister/stale removals.

Gauges are snapshotted by a background goroutine on the admin server;
counters and the histogram are recorded at their event sites.

* admin: read worker slot totals under lock, clear next-scan gauge when idle

GetWorkers returns live worker pointers; summing CurrentLoad/MaxConcurrent
outside the queue lock races with task assignment and completion. Add
GetWorkerSlotTotals to aggregate under the lock.

Also reset maintenance_next_scan_timestamp_seconds to 0 when the scanner
is not running, so it can't retain a stale value after a stop.
This commit is contained in:
Chris Lu
2026-05-24 14:09:02 -07:00
committed by GitHub
parent 6fc212cedb
commit 25beb7ec48
6 changed files with 198 additions and 7 deletions

View File

@@ -23,6 +23,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -280,6 +281,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
go server.monitorVacuumWorker(bgCtx)
}
go server.publishMaintenanceMetrics(bgCtx)
return server
}
@@ -364,6 +367,55 @@ func (s *AdminServer) monitorVacuumWorker(ctx context.Context) {
}
}
// publishMaintenanceMetrics periodically snapshots the maintenance queue and
// worker fleet into Prometheus gauges. Counters and durations are recorded at
// their event sites; these gauges reflect current state at scrape resolution.
func (s *AdminServer) publishMaintenanceMetrics(ctx context.Context) {
const interval = 15 * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.collectMaintenanceMetrics()
}
}
}
func (s *AdminServer) collectMaintenanceMetrics() {
if s.maintenanceManager == nil {
return
}
stats := s.maintenanceManager.GetStats()
stats_collect.AdminMaintenanceTasksByStatus.Reset()
for status, count := range stats.TasksByStatus {
stats_collect.AdminMaintenanceTasksByStatus.WithLabelValues(string(status)).Set(float64(count))
}
stats_collect.AdminMaintenanceTasksByType.Reset()
for taskType, count := range stats.TasksByType {
stats_collect.AdminMaintenanceTasksByType.WithLabelValues(string(taskType)).Set(float64(count))
}
// NextScanTime is only meaningful while the scanner runs; GetStats computes
// it unconditionally, so clear the gauge when idle to avoid a stale value.
if s.maintenanceManager.IsRunning() && !stats.NextScanTime.IsZero() {
stats_collect.AdminMaintenanceNextScanTimestampSeconds.Set(float64(stats.NextScanTime.Unix()))
} else {
stats_collect.AdminMaintenanceNextScanTimestampSeconds.Set(0)
}
workers, usedSlots, maxSlots := s.maintenanceManager.GetWorkerSlotTotals()
stats_collect.AdminWorkersConnected.Set(float64(workers))
stats_collect.AdminWorkerSlots.WithLabelValues("used").Set(float64(usedSlots))
stats_collect.AdminWorkerSlots.WithLabelValues("max").Set(float64(maxSlots))
}
// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files
func (s *AdminServer) loadTaskConfigurationsFromPersistence() {
if s.configPersistence == nil || !s.configPersistence.IsConfigured() {

View File

@@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -233,6 +234,7 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr
}
s.connections[workerID] = conn
s.connMutex.Unlock()
stats_collect.AdminWorkerEventsTotal.WithLabelValues("registered").Inc()
// Register worker with maintenance manager
s.registerWorkerWithManager(conn)
@@ -265,11 +267,11 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr
select {
case <-ctx.Done():
glog.Infof("Worker %s connection closed: %v", workerID, ctx.Err())
s.unregisterWorker(conn)
s.unregisterWorker(conn, "unregistered")
return nil
case <-connCtx.Done():
glog.Infof("Worker %s connection cancelled", workerID)
s.unregisterWorker(conn)
s.unregisterWorker(conn, "unregistered")
return nil
default:
}
@@ -285,7 +287,7 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr
default:
glog.Errorf("Error receiving from worker %s: %v", workerID, err)
}
s.unregisterWorker(conn)
s.unregisterWorker(conn, "unregistered")
return err
}
@@ -338,7 +340,7 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work
case *worker_pb.WorkerMessage_Shutdown:
glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason)
s.unregisterWorker(conn)
s.unregisterWorker(conn, "unregistered")
default:
glog.Warningf("Unknown message type from worker %s", workerID)
@@ -605,7 +607,7 @@ func (s *WorkerGrpcServer) safeCloseOutgoingChannel(conn *WorkerConnection, sour
}
// unregisterWorker removes a worker connection
func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection) {
func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection, event string) {
s.connMutex.Lock()
existingConn, exists := s.connections[conn.workerID]
if !exists {
@@ -624,6 +626,7 @@ func (s *WorkerGrpcServer) unregisterWorker(conn *WorkerConnection) {
// Remove from map first to prevent duplicate cleanup attempts
delete(s.connections, conn.workerID)
s.connMutex.Unlock()
stats_collect.AdminWorkerEventsTotal.WithLabelValues(event).Inc()
// Cancel context to signal goroutines to stop
conn.cancel()
@@ -665,7 +668,7 @@ func (s *WorkerGrpcServer) cleanupStaleConnections() {
for _, conn := range toRemove {
glog.Warningf("Cleaning up stale worker connection: %s", conn.workerID)
s.unregisterWorker(conn)
s.unregisterWorker(conn, "stale_removed")
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
@@ -315,6 +316,7 @@ func (mm *MaintenanceManager) performScan() {
glog.Infof("Starting maintenance scan...")
results, err := mm.scanner.ScanForMaintenanceTasks()
stats_collect.AdminMaintenanceLastScanTimestampSeconds.SetToCurrentTime()
if err != nil {
// Handle scan error
mm.mutex.Lock()
@@ -518,6 +520,11 @@ func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
return mm.queue.GetWorkers()
}
// GetWorkerSlotTotals returns worker count and aggregate used/max task slots.
func (mm *MaintenanceManager) GetWorkerSlotTotals() (workers, used, max int) {
return mm.queue.GetWorkerSlotTotals()
}
// TriggerScan manually triggers a maintenance scan
func (mm *MaintenanceManager) TriggerScan() error {
return mm.triggerScanInternal(true)

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
)
// NewMaintenanceQueue creates a new maintenance queue
@@ -482,12 +483,14 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
// Calculate task duration
var duration time.Duration
if task.StartedAt != nil {
hadStart := task.StartedAt != nil
if hadStart {
duration = completedTime.Sub(*task.StartedAt)
}
// Capture workerID before it may be cleared during retry
originalWorkerID := task.WorkerID
taskType := string(task.Type)
var taskToSave *MaintenanceTask
var logFn func()
@@ -577,6 +580,18 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
}
mq.mutex.Unlock()
// Record terminal-state metrics. A retry leaves the task pending, so it
// is not counted as completed or failed here.
switch taskStatus {
case TaskStatusCompleted:
stats_collect.AdminMaintenanceTasksCompletedTotal.WithLabelValues(taskType, "completed").Inc()
case TaskStatusFailed:
stats_collect.AdminMaintenanceTasksCompletedTotal.WithLabelValues(taskType, "failed").Inc()
}
if hadStart && (taskStatus == TaskStatusCompleted || taskStatus == TaskStatusFailed) {
stats_collect.AdminMaintenanceTaskDurationSeconds.WithLabelValues(taskType).Observe(duration.Seconds())
}
// Only persist non-terminal tasks (retries). Completed/failed tasks stay
// in memory for the UI but are not written to disk — they would just
// accumulate and slow down future startups.
@@ -819,6 +834,20 @@ func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
return workers
}
// GetWorkerSlotTotals aggregates worker count and used/max task slots under the
// lock, so callers don't read live worker fields that task updates mutate.
func (mq *MaintenanceQueue) GetWorkerSlotTotals() (workers, used, max int) {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
for _, worker := range mq.workers {
workers++
used += worker.CurrentLoad
max += worker.MaxConcurrent
}
return
}
// generateTaskID generates a unique ID for tasks
func generateTaskID() string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"

View File

@@ -30,6 +30,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
)
@@ -50,6 +51,8 @@ type AdminOptions struct {
dataDir *string
icebergPort *int
urlPrefix *string
metricsHttpPort *int
metricsHttpIp *string
debug *bool
debugPort *int
cpuProfile *string
@@ -70,6 +73,8 @@ func init() {
a.readOnlyPassword = cmdAdmin.Flag.String("readOnlyPassword", "", "read-only user password (optional, for view-only access; requires adminPassword to be set)")
a.icebergPort = cmdAdmin.Flag.Int("iceberg.port", 8181, "Iceberg REST Catalog port (0 to hide in UI)")
a.urlPrefix = cmdAdmin.Flag.String("urlPrefix", "", "URL path prefix when running behind a reverse proxy under a subdirectory (e.g. /seaweedfs)")
a.metricsHttpPort = cmdAdmin.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
a.metricsHttpIp = cmdAdmin.Flag.String("metricsIp", "", "metrics listen ip. If empty, listens on all interfaces.")
a.debug = cmdAdmin.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
a.debugPort = cmdAdmin.Flag.Int("debug.port", 6060, "http port for debugging")
a.cpuProfile = cmdAdmin.Flag.String("cpuprofile", "", "cpu profile output file")
@@ -160,6 +165,12 @@ var cmdAdmin = &Command{
weed admin -debug -debug.port=6060 -master="localhost:9333"
weed admin -cpuprofile=cpu.prof -memprofile=mem.prof -master="localhost:9333"
Metrics:
- Use -metricsPort to expose Prometheus metrics at http://<host>:<metricsPort>/metrics
- Use -metricsIp to bind the metrics endpoint to a specific ip (default: all interfaces)
- Metrics are disabled when -metricsPort is 0 (the default)
- Example: weed admin -metricsPort=9327 -master="localhost:9333"
Configuration File:
- The security.toml file is read from ".", "$HOME/.seaweedfs/",
"/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order
@@ -257,6 +268,12 @@ func runAdmin(cmd *Command, args []string) bool {
}
fmt.Printf("Plugin: Enabled\n")
// Start Prometheus metrics endpoint if a port is configured
if *a.metricsHttpPort > 0 {
fmt.Printf("Metrics: http://%s/metrics\n", stats_collect.JoinHostPort(*a.metricsHttpIp, *a.metricsHttpPort))
}
go stats_collect.StartMetricsServer(*a.metricsHttpIp, *a.metricsHttpPort)
// Set up graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@@ -643,6 +643,79 @@ var (
Name: "daily_run_last_walked_ns",
Help: "Per-shard timestamp (UnixNano) of the most recent successful walker fire. 0 means the shard hasn't completed a walk yet.",
}, []string{"shard"})
AdminMaintenanceTasksByStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "maintenance_tasks_by_status",
Help: "Current number of maintenance tasks by status (pending, assigned, in_progress, completed, failed, cancelled).",
}, []string{"status"})
AdminMaintenanceTasksByType = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "maintenance_tasks_by_type",
Help: "Current number of maintenance tasks by type.",
}, []string{"type"})
AdminMaintenanceTasksCompletedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "maintenance_tasks_completed_total",
Help: "Counter of maintenance tasks that reached a terminal state, by type and outcome (completed, failed).",
}, []string{"type", "outcome"})
AdminMaintenanceTaskDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "maintenance_task_duration_seconds",
Help: "Execution time of maintenance tasks that reached a terminal state, by type.",
Buckets: prometheus.ExponentialBuckets(1, 2, 16),
}, []string{"type"})
AdminMaintenanceLastScanTimestampSeconds = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "maintenance_last_scan_timestamp_seconds",
Help: "Unix timestamp of the most recent maintenance scan. 0 means no scan has run yet.",
})
AdminMaintenanceNextScanTimestampSeconds = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "maintenance_next_scan_timestamp_seconds",
Help: "Unix timestamp of the next expected maintenance scan.",
})
AdminWorkersConnected = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "workers_connected",
Help: "Current number of maintenance workers known to the admin server.",
})
AdminWorkerSlots = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "worker_slots",
Help: "Maintenance worker task slots aggregated across workers, by state (used, max).",
}, []string{"state"})
AdminWorkerEventsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "admin",
Name: "worker_events_total",
Help: "Counter of maintenance worker lifecycle events by type (registered, unregistered, stale_removed).",
}, []string{"event"})
)
func init() {
@@ -726,6 +799,16 @@ func init() {
Gather.MustRegister(UploadErrorCounter)
Gather.MustRegister(AdminMaintenanceTasksByStatus)
Gather.MustRegister(AdminMaintenanceTasksByType)
Gather.MustRegister(AdminMaintenanceTasksCompletedTotal)
Gather.MustRegister(AdminMaintenanceTaskDurationSeconds)
Gather.MustRegister(AdminMaintenanceLastScanTimestampSeconds)
Gather.MustRegister(AdminMaintenanceNextScanTimestampSeconds)
Gather.MustRegister(AdminWorkersConnected)
Gather.MustRegister(AdminWorkerSlots)
Gather.MustRegister(AdminWorkerEventsTotal)
go bucketMetricTTLControl()
}