From 7bc57b5a5f396099a7fc02b1bd1f48871c7a109f Mon Sep 17 00:00:00 2001 From: Scott Seago Date: Thu, 13 Nov 2025 14:04:15 -0500 Subject: [PATCH] Refactor queue controller to reduce apiserver list calls Signed-off-by: Scott Seago --- pkg/controller/backup_queue_controller.go | 145 ++++++++++++---------- 1 file changed, 79 insertions(+), 66 deletions(-) diff --git a/pkg/controller/backup_queue_controller.go b/pkg/controller/backup_queue_controller.go index 7817685c2..1926f60cb 100644 --- a/pkg/controller/backup_queue_controller.go +++ b/pkg/controller/backup_queue_controller.go @@ -140,44 +140,6 @@ func (r *backupQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *backupQueueReconciler) listQueuedBackups(ctx context.Context, ns string) ([]velerov1api.Backup, error) { - backupList := &velerov1api.BackupList{} - backups := []velerov1api.Backup{} - if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: ns}); err != nil { - r.logger.WithError(err).Error("error listing queued backups") - return nil, err - } - for _, backup := range backupList.Items { - if backup.Status.Phase == velerov1api.BackupPhaseQueued { - backups = append(backups, backup) - } - } - return backups, nil -} - -func (r *backupQueueReconciler) listEarlierBackups(ctx context.Context, ns string, queuePos int) ([]velerov1api.Backup, int, error) { - backupList := &velerov1api.BackupList{} - backups := []velerov1api.Backup{} - runningCount := 0 - if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: ns}); err != nil { - return nil, 0, err - } - for _, backup := range backupList.Items { - if backup.Status.Phase == velerov1api.BackupPhaseInProgress || - backup.Status.Phase == velerov1api.BackupPhaseReadyToStart || - (backup.Status.Phase == velerov1api.BackupPhaseQueued && - backup.Status.QueuePosition < queuePos) { - backups = append(backups, backup) - } - // InProgress and ReadyToStart backups count towards the concurrentBackups limit - if backup.Status.Phase == velerov1api.BackupPhaseInProgress || - backup.Status.Phase == velerov1api.BackupPhaseReadyToStart { - runningCount++ - } - } - return backups, runningCount, nil -} - func (r *backupQueueReconciler) detectNamespaceConflict(ctx context.Context, backup *velerov1api.Backup, earlierBackups []velerov1api.Backup) (bool, string, []string, error) { nsList := &corev1api.NamespaceList{} if err := r.Client.List(ctx, nsList); err != nil { @@ -240,37 +202,28 @@ func (r *backupQueueReconciler) checkForEarlierRunnableBackups(backup *velerov1a func namespacesForBackup(backup *velerov1api.Backup, clusterNamespaces []string) []string { return collections.NewNamespaceIncludesExcludes().Includes(backup.Spec.IncludedNamespaces...).Excludes(backup.Spec.ExcludedNamespaces...).ActiveNamespaces(clusterNamespaces).ResolveNamespaceList() } -func (r *backupQueueReconciler) getMaxQueuePosition(ctx context.Context, ns string) (int, error) { - queuedBackups, err := r.listQueuedBackups(ctx, ns) - if err != nil { - return 0, err - } +func (r *backupQueueReconciler) getMaxQueuePosition(lister *queuedBackupsLister) int { + queuedBackups := lister.queued() maxPos := 0 for _, backup := range queuedBackups { maxPos = max(maxPos, backup.Status.QueuePosition) } - return maxPos, nil + return maxPos } -func (r *backupQueueReconciler) orderedQueuedBackups(ctx context.Context, backup *velerov1api.Backup) ([]velerov1api.Backup, error) { - backupList := &velerov1api.BackupList{} - var returnList []velerov1api.Backup - if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: backup.Namespace}); err != nil { +func (r *backupQueueReconciler) orderedQueuedBackupsForEnqueue(ctx context.Context, backup *velerov1api.Backup) ([]velerov1api.Backup, error) { + lister, err := r.newQueuedBackupsLister(ctx, backup.Namespace) + if err != nil { r.logger.WithError(err).Error("error listing backups") return nil, err } - orderedBackupList := queuePositionOrderFunc(backupList).(*velerov1api.BackupList) - for _, item := range orderedBackupList.Items { - if item.Status.Phase == velerov1api.BackupPhaseQueued { - returnList = append(returnList, item) - } - } - return returnList, nil + return lister.orderedQueued(), nil } + func (r *backupQueueReconciler) findQueuedBackupsToRequeue(ctx context.Context, obj client.Object) []reconcile.Request { backup := obj.(*velerov1api.Backup) requests := []reconcile.Request{} - backups, _ := r.orderedQueuedBackups(ctx, backup) + backups, _ := r.orderedQueuedBackupsForEnqueue(ctx, backup) for _, item := range backups { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -295,11 +248,12 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) switch backup.Status.Phase { case "", velerov1api.BackupPhaseNew: // queue new backup - maxQueuePosition, err := r.getMaxQueuePosition(ctx, backup.Namespace) + lister, err := r.newQueuedBackupsLister(ctx, backup.Namespace) if err != nil { - // error is logged in getMaxQueuePosition func + // error is logged in newQueuedBackupsLister func return ctrl.Result{}, nil //nolint:nilerr // We want to return nil to avoid requeue } + maxQueuePosition := r.getMaxQueuePosition(lister) original := backup.DeepCopy() backup.Status.Phase = velerov1api.BackupPhaseQueued backup.Status.QueuePosition = maxQueuePosition + 1 @@ -310,11 +264,12 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) case velerov1api.BackupPhaseQueued: // handle queued backup // Find backups ahead of this one (InProgress, ReadyToStart, or Queued with higher position) - earlierBackups, runningCount, err := r.listEarlierBackups(ctx, backup.Namespace, backup.Status.QueuePosition) + lister, err := r.newQueuedBackupsLister(ctx, backup.Namespace) if err != nil { - log.WithError(err).Error("error listing earlier backups") - return ctrl.Result{}, nil + // error is logged in newQueuedBackupsLister func + return ctrl.Result{}, nil //nolint:nilerr // We want to return nil to avoid requeue } + earlierBackups, runningCount := lister.earlierThan(backup.Status.QueuePosition) if runningCount >= r.concurrentBackups { log.Debugf("%v concurrent backups are already running, leaving %v queued", r.concurrentBackups, backup.Name) return ctrl.Result{}, nil @@ -341,11 +296,7 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase) } log.Debug("Updating queuePosition for remaining queued backups") - queuedBackups, err := r.orderedQueuedBackups(ctx, backup) - if err != nil { - log.WithError(err).Error("error listing queued backups") - return ctrl.Result{}, nil - } + queuedBackups := lister.orderedQueued() newQueuePos := 1 for _, queuedBackup := range queuedBackups { if queuedBackup.Name != backup.Name { @@ -366,3 +317,65 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } + +// queuedBackupsLister manages a list of all backups Queued, ReadyToStart, or InProgress +// with methods to return specific subsets as needed +type queuedBackupsLister struct { + backups *velerov1api.BackupList +} + +func (r *backupQueueReconciler) newQueuedBackupsLister(ctx context.Context, ns string) (*queuedBackupsLister, error) { + backupList := &velerov1api.BackupList{} + backups := []velerov1api.Backup{} + if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: ns}); err != nil { + r.logger.WithError(err).Error("error listing backups") + return nil, err + } + for _, backup := range backupList.Items { + if backup.Status.Phase == velerov1api.BackupPhaseQueued || + backup.Status.Phase == velerov1api.BackupPhaseInProgress || + backup.Status.Phase == velerov1api.BackupPhaseReadyToStart { + backups = append(backups, backup) + } + } + backupList.Items = backups + return &queuedBackupsLister{backupList}, nil +} + +func (l *queuedBackupsLister) earlierThan(queuePos int) ([]velerov1api.Backup, int){ + backups := []velerov1api.Backup{} + runningCount := 0 + for _, backup := range l.backups.Items { + // InProgress and ReadyToStart backups have QueuePosition==0 + if backup.Status.QueuePosition < queuePos { + backups = append(backups, backup) + } + // InProgress and ReadyToStart backups count towards the concurrentBackups limit + if backup.Status.Phase == velerov1api.BackupPhaseInProgress || + backup.Status.Phase == velerov1api.BackupPhaseReadyToStart { + runningCount++ + } + } + return backups, runningCount +} + +func (l *queuedBackupsLister) queued() []velerov1api.Backup { + backups := []velerov1api.Backup{} + for _, backup := range l.backups.Items { + if backup.Status.Phase == velerov1api.BackupPhaseQueued { + backups = append(backups, backup) + } + } + return backups +} + +func (l *queuedBackupsLister) orderedQueued() []velerov1api.Backup { + var returnList []velerov1api.Backup + orderedBackupList := queuePositionOrderFunc(l.backups).(*velerov1api.BackupList) + for _, item := range orderedBackupList.Items { + if item.Status.Phase == velerov1api.BackupPhaseQueued { + returnList = append(returnList, item) + } + } + return returnList +}