/* Copyright the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "context" "slices" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/constant" "github.com/vmware-tanzu/velero/pkg/util/collections" "github.com/vmware-tanzu/velero/pkg/util/kube" ) // backupQueueReconciler reconciles a Backup object type backupQueueReconciler struct { client.Client Scheme *runtime.Scheme logger logrus.FieldLogger concurrentBackups int backupTracker BackupTracker frequency time.Duration } const ( defaultQueuedBackupRecheckFrequency = time.Minute ) // NewBackupQueueReconciler returns a new backupQueueReconciler func NewBackupQueueReconciler( client client.Client, scheme *runtime.Scheme, logger logrus.FieldLogger, concurrentBackups int, backupTracker BackupTracker, ) *backupQueueReconciler { return &backupQueueReconciler{ Client: client, Scheme: scheme, logger: logger, concurrentBackups: max(concurrentBackups, 1), backupTracker: backupTracker, frequency: defaultQueuedBackupRecheckFrequency, } } func queuePositionOrderFunc(objList client.ObjectList) client.ObjectList { backupList := objList.(*velerov1api.BackupList) slices.SortFunc(backupList.Items, func(backup1, backup2 velerov1api.Backup) int { if backup1.Status.QueuePosition < backup2.Status.QueuePosition { return -1 } else if backup1.Status.QueuePosition == backup2.Status.QueuePosition { return 0 } else { return 1 } }) return backupList } // SetupWithManager adds the reconciler to the manager func (r *backupQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { // For periodic requeue, only consider Queued backups, order by QueuePosition gp := kube.NewGenericEventPredicate(func(object client.Object) bool { backup := object.(*velerov1api.Backup) return backup.Status.Phase == velerov1api.BackupPhaseQueued }) s := kube.NewPeriodicalEnqueueSource(r.logger.WithField("controller", constant.ControllerBackupQueue), mgr.GetClient(), &velerov1api.BackupList{}, r.frequency, kube.PeriodicalEnqueueSourceOption{ Predicates: []predicate.Predicate{gp}, OrderFunc: queuePositionOrderFunc, }) return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.Backup{}, builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(ue event.UpdateEvent) bool { backup := ue.ObjectNew.(*velerov1api.Backup) return backup.Status.Phase == "" || backup.Status.Phase == velerov1api.BackupPhaseNew }, CreateFunc: func(ce event.CreateEvent) bool { backup := ce.Object.(*velerov1api.Backup) return backup.Status.Phase == "" || backup.Status.Phase == velerov1api.BackupPhaseNew }, DeleteFunc: func(de event.DeleteEvent) bool { return false }, GenericFunc: func(ge event.GenericEvent) bool { return false }, })). Watches( &velerov1api.Backup{}, handler.EnqueueRequestsFromMapFunc(r.findQueuedBackupsToRequeue), builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(ue event.UpdateEvent) bool { oldBackup := ue.ObjectOld.(*velerov1api.Backup) newBackup := ue.ObjectNew.(*velerov1api.Backup) return oldBackup.Status.Phase == velerov1api.BackupPhaseInProgress && newBackup.Status.Phase != velerov1api.BackupPhaseInProgress || oldBackup.Status.Phase != velerov1api.BackupPhaseQueued && newBackup.Status.Phase == velerov1api.BackupPhaseQueued && r.backupTracker.RunningCount() < r.concurrentBackups }, CreateFunc: func(event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return false }, GenericFunc: func(ge event.GenericEvent) bool { return false }, })). WatchesRawSource(s). Named(constant.ControllerBackupQueue). Complete(r) } func (r *backupQueueReconciler) detectNamespaceConflict(ctx context.Context, backup *velerov1api.Backup, earlierBackups []velerov1api.Backup) (bool, string, []string, error) { nsList := &corev1api.NamespaceList{} if err := r.Client.List(ctx, nsList); err != nil { return false, "", nil, err } var clusterNamespaces []string for _, ns := range nsList.Items { clusterNamespaces = append(clusterNamespaces, ns.Name) } foundConflict, conflictBackup := detectNSConflictsInternal(backup, earlierBackups, clusterNamespaces) return foundConflict, conflictBackup, clusterNamespaces, nil } func detectNSConflictsInternal(backup *velerov1api.Backup, earlierBackups []velerov1api.Backup, clusterNamespaces []string) (bool, string) { backupNamespaces := sets.NewString(namespacesForBackup(backup, clusterNamespaces)...) for _, earlierBackup := range earlierBackups { // This will never be true for the primary backup, but for the secondary // runnability check for queued backups ahead of the current backup, we // only care about backups ahead of it. // Backup isn't earlier than this one, skip if earlierBackup.Status.Phase == velerov1api.BackupPhaseQueued && earlierBackup.Status.QueuePosition >= backup.Status.QueuePosition { continue } if backupNamespaces.HasAny(namespacesForBackup(&earlierBackup, clusterNamespaces)...) { return true, earlierBackup.Name } } return false, "" } // Returns true if there are backups ahead of the current backup that are runnable // This could happen if velero just reconciled the one earlier in the queue and rejected it // due to too many running backups, but a backup completed in between that reconcile and this one // so exit, as the recent completion has triggered another reconcile of all queued backups func (r *backupQueueReconciler) checkForEarlierRunnableBackups(backup *velerov1api.Backup, earlierBackups []velerov1api.Backup, clusterNamespaces []string) (bool, string) { for _, earlierBackup := range earlierBackups { // if this backup is queued and ahead of current backup, check for conflicts if earlierBackup.Status.Phase != velerov1api.BackupPhaseQueued || earlierBackup.Status.QueuePosition >= backup.Status.QueuePosition { continue } conflict, _ := detectNSConflictsInternal(&earlierBackup, earlierBackups, clusterNamespaces) // !conflict means we've found an earlier backup that is currently runnable // so current reconcile should exit to run this one if !conflict { return true, earlierBackup.Name } } return false, "" } func namespacesForBackup(backup *velerov1api.Backup, clusterNamespaces []string) []string { // Ignore error here. If a backup has invalid namespace wildcards, the backup controller // will validate and fail it. Consider the ns list empty for conflict detection purposes. nsList, err := collections.NewNamespaceIncludesExcludes().Includes(backup.Spec.IncludedNamespaces...).Excludes(backup.Spec.ExcludedNamespaces...).ActiveNamespaces(clusterNamespaces).ResolveNamespaceList() if err != nil { return []string{} } return nsList } func (r *backupQueueReconciler) getMaxQueuePosition(lister *queuedBackupsLister) int { queuedBackups := lister.orderedQueued() maxPos := 0 if len(queuedBackups) > 0 { maxPos = queuedBackups[len(queuedBackups)-1].Status.QueuePosition } return maxPos } func (r *backupQueueReconciler) findQueuedBackupsToRequeue(ctx context.Context, obj client.Object) []reconcile.Request { backup := obj.(*velerov1api.Backup) requests := []reconcile.Request{} allBackups := &velerov1api.BackupList{} if err := r.Client.List(ctx, allBackups, &client.ListOptions{Namespace: backup.Namespace}); err != nil { r.logger.WithError(err).Error("error listing backups") return requests } backups := r.newQueuedBackupsLister(allBackups).orderedQueued() for _, item := range backups { requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: item.GetNamespace(), Name: item.GetName(), }, }) } return requests } // Reconcile reconciles a Backup object func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.logger.WithField("backup", req.NamespacedName.String()) log.Debug("Getting backup") backup := &velerov1api.Backup{} if err := r.Get(ctx, req.NamespacedName, backup); err != nil { log.WithError(err).Error("unable to get backup") return ctrl.Result{}, client.IgnoreNotFound(err) } switch backup.Status.Phase { case "", velerov1api.BackupPhaseNew: // queue new backup allBackups := &velerov1api.BackupList{} if err := r.Client.List(ctx, allBackups, &client.ListOptions{Namespace: backup.Namespace}); err != nil { r.logger.WithError(err).Error("error listing backups") return ctrl.Result{}, nil } lister := r.newQueuedBackupsLister(allBackups) maxQueuePosition := r.getMaxQueuePosition(lister) original := backup.DeepCopy() backup.Status.Phase = velerov1api.BackupPhaseQueued backup.Status.QueuePosition = maxQueuePosition + 1 log.Infof("Queueing backup %v, queue position %v", backup.Name, backup.Status.QueuePosition) if err := kube.PatchResource(original, backup, r.Client); err != nil { return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase) } case velerov1api.BackupPhaseQueued: // handle queued backup // Find backups ahead of this one (InProgress, ReadyToStart, or Queued with higher position) allBackups := &velerov1api.BackupList{} if err := r.Client.List(ctx, allBackups, &client.ListOptions{Namespace: backup.Namespace}); err != nil { r.logger.WithError(err).Error("error listing backups") return ctrl.Result{}, nil } lister := r.newQueuedBackupsLister(allBackups) if r.backupTracker.RunningCount() >= r.concurrentBackups { log.Debugf("%v concurrent backups are already running, leaving %v queued", r.concurrentBackups, backup.Name) return ctrl.Result{}, nil } earlierBackups := lister.earlierThan(backup.Status.QueuePosition) foundConflict, conflictBackup, clusterNamespaces, err := r.detectNamespaceConflict(ctx, backup, earlierBackups) if err != nil { log.WithError(err).Error("error listing namespaces") return ctrl.Result{}, nil } if foundConflict { log.Infof("Backup %v has namespace conflict with %v, leaving queued", backup.Name, conflictBackup) return ctrl.Result{}, nil } foundEarlierRunnable, earlierRunnable := r.checkForEarlierRunnableBackups(backup, earlierBackups, clusterNamespaces) if foundEarlierRunnable { log.Infof("Earlier queued backup %v is runnable, leaving %v queued", earlierRunnable, backup.Name) return ctrl.Result{}, nil } log.Infof("Dequeueing backup %v, moving to ReadyToStart", backup.Name) original := backup.DeepCopy() backup.Status.Phase = velerov1api.BackupPhaseReadyToStart backup.Status.QueuePosition = 0 if err := kube.PatchResource(original, backup, r.Client); err != nil { return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase) } r.backupTracker.AddReadyToStart(backup.Namespace, backup.Name) log.Debug("Updating queuePosition for remaining queued backups") queuedBackups := lister.orderedQueued() newQueuePos := 1 for _, queuedBackup := range queuedBackups { if queuedBackup.Name != backup.Name { original := queuedBackup.DeepCopy() queuedBackup.Status.QueuePosition = newQueuePos if err := kube.PatchResource(original, &queuedBackup, r.Client); err != nil { log.WithError(errors.Wrapf(err, "error updating Backup %s queuePosition to %v", queuedBackup.Name, newQueuePos)) return ctrl.Result{}, nil } newQueuePos++ } } return ctrl.Result{}, nil default: log.Debug("Backup is not New or Queued, skipping") return ctrl.Result{}, nil } return ctrl.Result{}, nil } // queuedBackupsLister manages a list of all backups Queued, ReadyToStart, or InProgress // with methods to return specific subsets as needed type queuedBackupsLister struct { backups *velerov1api.BackupList } func (r *backupQueueReconciler) newQueuedBackupsLister(backupList *velerov1api.BackupList) *queuedBackupsLister { backups := []velerov1api.Backup{} for _, backup := range backupList.Items { if backup.Status.Phase == velerov1api.BackupPhaseQueued || backup.Status.Phase == velerov1api.BackupPhaseInProgress || backup.Status.Phase == velerov1api.BackupPhaseReadyToStart { backups = append(backups, backup) } } backupList.Items = backups return &queuedBackupsLister{backupList} } func (l *queuedBackupsLister) earlierThan(queuePos int) []velerov1api.Backup { backups := []velerov1api.Backup{} for _, backup := range l.backups.Items { // InProgress and ReadyToStart backups have QueuePosition==0 if backup.Status.QueuePosition < queuePos { backups = append(backups, backup) } } return backups } func (l *queuedBackupsLister) orderedQueued() []velerov1api.Backup { var returnList []velerov1api.Backup orderedBackupList := queuePositionOrderFunc(l.backups).(*velerov1api.BackupList) for _, item := range orderedBackupList.Items { if item.Status.Phase == velerov1api.BackupPhaseQueued { returnList = append(returnList, item) } } return returnList }