From 7e4797f5883dbfc6c54c6896557bfbed627df7d1 Mon Sep 17 00:00:00 2001 From: Scott Seago Date: Mon, 1 Dec 2025 17:13:28 -0500 Subject: [PATCH] Track running backup count via BackupTracker This avoids an unnecessary apiserver List call when the backup reconciler is already at capacity. Signed-off-by: Scott Seago --- pkg/cmd/server/server.go | 1 + pkg/controller/backup_controller.go | 2 + pkg/controller/backup_queue_controller.go | 18 +++--- .../backup_queue_controller_test.go | 8 ++- pkg/controller/backup_tracker.go | 61 ++++++++++++++++--- 5 files changed, 71 insertions(+), 19 deletions(-) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index fbff1086d..68330670d 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -917,6 +917,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.mgr.GetScheme(), s.logger, s.config.ConcurrentBackups, + backupTracker, ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupQueue) } diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index f269353cb..37faafd68 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -323,6 +323,8 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr switch request.Status.Phase { case velerov1api.BackupPhaseCompleted, velerov1api.BackupPhasePartiallyFailed, velerov1api.BackupPhaseFailed, velerov1api.BackupPhaseFailedValidation: b.backupTracker.Delete(request.Namespace, request.Name) + case velerov1api.BackupPhaseWaitingForPluginOperations, velerov1api.BackupPhaseWaitingForPluginOperationsPartiallyFailed, velerov1api.BackupPhaseFinalizing, velerov1api.BackupPhaseFinalizingPartiallyFailed: + b.backupTracker.AddPostProcessing(request.Namespace, request.Name) } }() diff --git a/pkg/controller/backup_queue_controller.go b/pkg/controller/backup_queue_controller.go index 5a99c6328..556250b3e 100644 --- a/pkg/controller/backup_queue_controller.go +++ b/pkg/controller/backup_queue_controller.go @@ -47,6 +47,7 @@ type backupQueueReconciler struct { Scheme *runtime.Scheme logger logrus.FieldLogger concurrentBackups int + backupTracker BackupTracker frequency time.Duration } @@ -60,12 +61,14 @@ func NewBackupQueueReconciler( scheme *runtime.Scheme, logger logrus.FieldLogger, concurrentBackups int, + backupTracker BackupTracker, ) *backupQueueReconciler { return &backupQueueReconciler{ Client: client, Scheme: scheme, logger: logger, concurrentBackups: max(concurrentBackups, 1), + backupTracker: backupTracker, frequency: defaultQueuedBackupRecheckFrequency, } } @@ -261,11 +264,11 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } lister := r.newQueuedBackupsLister(allBackups) - earlierBackups, runningCount := lister.earlierThan(backup.Status.QueuePosition) - if runningCount >= r.concurrentBackups { + if r.backupTracker.RunningCount() >= r.concurrentBackups { log.Debugf("%v concurrent backups are already running, leaving %v queued", r.concurrentBackups, backup.Name) return ctrl.Result{}, nil } + earlierBackups := lister.earlierThan(backup.Status.QueuePosition) foundConflict, conflictBackup, clusterNamespaces, err := r.detectNamespaceConflict(ctx, backup, earlierBackups) if err != nil { log.WithError(err).Error("error listing namespaces") @@ -287,6 +290,7 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := kube.PatchResource(original, backup, r.Client); err != nil { return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase) } + r.backupTracker.AddReadyToStart(backup.Namespace, backup.Name) log.Debug("Updating queuePosition for remaining queued backups") queuedBackups := lister.orderedQueued() newQueuePos := 1 @@ -329,21 +333,15 @@ func (r *backupQueueReconciler) newQueuedBackupsLister(backupList *velerov1api.B return &queuedBackupsLister{backupList} } -func (l *queuedBackupsLister) earlierThan(queuePos int) ([]velerov1api.Backup, int) { +func (l *queuedBackupsLister) earlierThan(queuePos int) []velerov1api.Backup { backups := []velerov1api.Backup{} - runningCount := 0 for _, backup := range l.backups.Items { // InProgress and ReadyToStart backups have QueuePosition==0 if backup.Status.QueuePosition < queuePos { backups = append(backups, backup) } - // InProgress and ReadyToStart backups count towards the concurrentBackups limit - if backup.Status.Phase == velerov1api.BackupPhaseInProgress || - backup.Status.Phase == velerov1api.BackupPhaseReadyToStart { - runningCount++ - } } - return backups, runningCount + return backups } func (l *queuedBackupsLister) orderedQueued() []velerov1api.Backup { diff --git a/pkg/controller/backup_queue_controller_test.go b/pkg/controller/backup_queue_controller_test.go index 8022d7140..2b479ed7e 100644 --- a/pkg/controller/backup_queue_controller_test.go +++ b/pkg/controller/backup_queue_controller_test.go @@ -202,9 +202,15 @@ func TestBackupQueueReconciler(t *testing.T) { return } + backupTracker := NewBackupTracker() initObjs := []runtime.Object{} for _, priorBackup := range test.priorBackups { initObjs = append(initObjs, priorBackup) + if priorBackup.Status.Phase == velerov1api.BackupPhaseReadyToStart { + backupTracker.AddReadyToStart(priorBackup.Namespace, priorBackup.Name) + } else if priorBackup.Status.Phase == velerov1api.BackupPhaseInProgress { + backupTracker.Add(priorBackup.Namespace, priorBackup.Name) + } } for _, ns := range test.namespaces { initObjs = append(initObjs, builder.ForNamespace(ns).Result()) @@ -214,7 +220,7 @@ func TestBackupQueueReconciler(t *testing.T) { fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...) logger := logrus.New() log := logger.WithField("controller", "backup-queue-test") - r := NewBackupQueueReconciler(fakeClient, scheme, log, test.concurrentBackups) + r := NewBackupQueueReconciler(fakeClient, scheme, log, test.concurrentBackups, backupTracker) req := ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}} res, err := r.Reconcile(t.Context(), req) gotErr := err != nil diff --git a/pkg/controller/backup_tracker.go b/pkg/controller/backup_tracker.go index 262886d56..a330b17b0 100644 --- a/pkg/controller/backup_tracker.go +++ b/pkg/controller/backup_tracker.go @@ -25,45 +25,90 @@ import ( // BackupTracker keeps track of in-progress backups. type BackupTracker interface { + // Add informs the tracker that a backup is ReadyToStart. + AddReadyToStart(ns, name string) // Add informs the tracker that a backup is in progress. Add(ns, name string) - // Delete informs the tracker that a backup is no longer in progress. + // Add informs the tracker that a backup has moved beyond InProgress + AddPostProcessing(ns, name string) + // Delete informs the tracker that a backup has reached a terminal state. Delete(ns, name string) - // Contains returns true if the tracker is tracking the backup. + // Contains returns true if backup is InProgress or post-InProgress Contains(ns, name string) bool + // RunningCount returns the number of backups which are ReadyToStart or InProgress + RunningCount() int } type backupTracker struct { - lock sync.RWMutex - backups sets.Set[string] + lock sync.RWMutex + readyToStartBackups sets.Set[string] + inProgressBackups sets.Set[string] + postProgressBackups sets.Set[string] } // NewBackupTracker returns a new BackupTracker. func NewBackupTracker() BackupTracker { return &backupTracker{ - backups: sets.New[string](), + readyToStartBackups: sets.New[string](), + inProgressBackups: sets.New[string](), + postProgressBackups: sets.New[string](), } } +func (bt *backupTracker) AddReadyToStart(ns, name string) { + bt.lock.Lock() + defer bt.lock.Unlock() + + bt.readyToStartBackups.Insert(backupTrackerKey(ns, name)) +} + func (bt *backupTracker) Add(ns, name string) { bt.lock.Lock() defer bt.lock.Unlock() - bt.backups.Insert(backupTrackerKey(ns, name)) + key := backupTrackerKey(ns, name) + bt.readyToStartBackups.Delete(key) + bt.inProgressBackups.Insert(key) +} + +func (bt *backupTracker) AddPostProcessing(ns, name string) { + bt.lock.Lock() + defer bt.lock.Unlock() + + key := backupTrackerKey(ns, name) + bt.readyToStartBackups.Delete(key) + bt.inProgressBackups.Delete(key) + bt.postProgressBackups.Insert(key) } func (bt *backupTracker) Delete(ns, name string) { bt.lock.Lock() defer bt.lock.Unlock() - bt.backups.Delete(backupTrackerKey(ns, name)) + key := backupTrackerKey(ns, name) + bt.readyToStartBackups.Delete(key) + bt.inProgressBackups.Delete(key) + bt.postProgressBackups.Delete(key) } +// Contains returns true if backup is InProgress or post-InProgress +// ignores ReadyToStart, since this is used to determine whether +// a backup is in progress and thus not able to be deleted now. func (bt *backupTracker) Contains(ns, name string) bool { bt.lock.RLock() defer bt.lock.RUnlock() - return bt.backups.Has(backupTrackerKey(ns, name)) + key := backupTrackerKey(ns, name) + return bt.inProgressBackups.Has(key) || bt.postProgressBackups.Has(key) +} + +// RunningCount returns the number of backups which are ReadyToStart or InProgress +// used by queue controller to determine whether a new backup can be started. +func (bt *backupTracker) RunningCount() int { + bt.lock.RLock() + defer bt.lock.RUnlock() + + return bt.inProgressBackups.Len() + bt.readyToStartBackups.Len() } func backupTrackerKey(ns, name string) string {