mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-06 21:36:30 +00:00
Refactor queue controller to reduce apiserver list calls
Signed-off-by: Scott Seago <sseago@redhat.com>
This commit is contained in:
@@ -140,44 +140,6 @@ func (r *backupQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *backupQueueReconciler) listQueuedBackups(ctx context.Context, ns string) ([]velerov1api.Backup, error) {
|
||||
backupList := &velerov1api.BackupList{}
|
||||
backups := []velerov1api.Backup{}
|
||||
if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(err).Error("error listing queued backups")
|
||||
return nil, err
|
||||
}
|
||||
for _, backup := range backupList.Items {
|
||||
if backup.Status.Phase == velerov1api.BackupPhaseQueued {
|
||||
backups = append(backups, backup)
|
||||
}
|
||||
}
|
||||
return backups, nil
|
||||
}
|
||||
|
||||
func (r *backupQueueReconciler) listEarlierBackups(ctx context.Context, ns string, queuePos int) ([]velerov1api.Backup, int, error) {
|
||||
backupList := &velerov1api.BackupList{}
|
||||
backups := []velerov1api.Backup{}
|
||||
runningCount := 0
|
||||
if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
for _, backup := range backupList.Items {
|
||||
if backup.Status.Phase == velerov1api.BackupPhaseInProgress ||
|
||||
backup.Status.Phase == velerov1api.BackupPhaseReadyToStart ||
|
||||
(backup.Status.Phase == velerov1api.BackupPhaseQueued &&
|
||||
backup.Status.QueuePosition < queuePos) {
|
||||
backups = append(backups, backup)
|
||||
}
|
||||
// InProgress and ReadyToStart backups count towards the concurrentBackups limit
|
||||
if backup.Status.Phase == velerov1api.BackupPhaseInProgress ||
|
||||
backup.Status.Phase == velerov1api.BackupPhaseReadyToStart {
|
||||
runningCount++
|
||||
}
|
||||
}
|
||||
return backups, runningCount, nil
|
||||
}
|
||||
|
||||
func (r *backupQueueReconciler) detectNamespaceConflict(ctx context.Context, backup *velerov1api.Backup, earlierBackups []velerov1api.Backup) (bool, string, []string, error) {
|
||||
nsList := &corev1api.NamespaceList{}
|
||||
if err := r.Client.List(ctx, nsList); err != nil {
|
||||
@@ -240,37 +202,28 @@ func (r *backupQueueReconciler) checkForEarlierRunnableBackups(backup *velerov1a
|
||||
func namespacesForBackup(backup *velerov1api.Backup, clusterNamespaces []string) []string {
|
||||
return collections.NewNamespaceIncludesExcludes().Includes(backup.Spec.IncludedNamespaces...).Excludes(backup.Spec.ExcludedNamespaces...).ActiveNamespaces(clusterNamespaces).ResolveNamespaceList()
|
||||
}
|
||||
func (r *backupQueueReconciler) getMaxQueuePosition(ctx context.Context, ns string) (int, error) {
|
||||
queuedBackups, err := r.listQueuedBackups(ctx, ns)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
func (r *backupQueueReconciler) getMaxQueuePosition(lister *queuedBackupsLister) int {
|
||||
queuedBackups := lister.queued()
|
||||
maxPos := 0
|
||||
for _, backup := range queuedBackups {
|
||||
maxPos = max(maxPos, backup.Status.QueuePosition)
|
||||
}
|
||||
return maxPos, nil
|
||||
return maxPos
|
||||
}
|
||||
|
||||
func (r *backupQueueReconciler) orderedQueuedBackups(ctx context.Context, backup *velerov1api.Backup) ([]velerov1api.Backup, error) {
|
||||
backupList := &velerov1api.BackupList{}
|
||||
var returnList []velerov1api.Backup
|
||||
if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: backup.Namespace}); err != nil {
|
||||
func (r *backupQueueReconciler) orderedQueuedBackupsForEnqueue(ctx context.Context, backup *velerov1api.Backup) ([]velerov1api.Backup, error) {
|
||||
lister, err := r.newQueuedBackupsLister(ctx, backup.Namespace)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Error("error listing backups")
|
||||
return nil, err
|
||||
}
|
||||
orderedBackupList := queuePositionOrderFunc(backupList).(*velerov1api.BackupList)
|
||||
for _, item := range orderedBackupList.Items {
|
||||
if item.Status.Phase == velerov1api.BackupPhaseQueued {
|
||||
returnList = append(returnList, item)
|
||||
}
|
||||
}
|
||||
return returnList, nil
|
||||
return lister.orderedQueued(), nil
|
||||
}
|
||||
|
||||
func (r *backupQueueReconciler) findQueuedBackupsToRequeue(ctx context.Context, obj client.Object) []reconcile.Request {
|
||||
backup := obj.(*velerov1api.Backup)
|
||||
requests := []reconcile.Request{}
|
||||
backups, _ := r.orderedQueuedBackups(ctx, backup)
|
||||
backups, _ := r.orderedQueuedBackupsForEnqueue(ctx, backup)
|
||||
for _, item := range backups {
|
||||
requests = append(requests, reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
@@ -295,11 +248,12 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
switch backup.Status.Phase {
|
||||
case "", velerov1api.BackupPhaseNew:
|
||||
// queue new backup
|
||||
maxQueuePosition, err := r.getMaxQueuePosition(ctx, backup.Namespace)
|
||||
lister, err := r.newQueuedBackupsLister(ctx, backup.Namespace)
|
||||
if err != nil {
|
||||
// error is logged in getMaxQueuePosition func
|
||||
// error is logged in newQueuedBackupsLister func
|
||||
return ctrl.Result{}, nil //nolint:nilerr // We want to return nil to avoid requeue
|
||||
}
|
||||
maxQueuePosition := r.getMaxQueuePosition(lister)
|
||||
original := backup.DeepCopy()
|
||||
backup.Status.Phase = velerov1api.BackupPhaseQueued
|
||||
backup.Status.QueuePosition = maxQueuePosition + 1
|
||||
@@ -310,11 +264,12 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
case velerov1api.BackupPhaseQueued:
|
||||
// handle queued backup
|
||||
// Find backups ahead of this one (InProgress, ReadyToStart, or Queued with higher position)
|
||||
earlierBackups, runningCount, err := r.listEarlierBackups(ctx, backup.Namespace, backup.Status.QueuePosition)
|
||||
lister, err := r.newQueuedBackupsLister(ctx, backup.Namespace)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error listing earlier backups")
|
||||
return ctrl.Result{}, nil
|
||||
// error is logged in newQueuedBackupsLister func
|
||||
return ctrl.Result{}, nil //nolint:nilerr // We want to return nil to avoid requeue
|
||||
}
|
||||
earlierBackups, runningCount := lister.earlierThan(backup.Status.QueuePosition)
|
||||
if runningCount >= r.concurrentBackups {
|
||||
log.Debugf("%v concurrent backups are already running, leaving %v queued", r.concurrentBackups, backup.Name)
|
||||
return ctrl.Result{}, nil
|
||||
@@ -341,11 +296,7 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase)
|
||||
}
|
||||
log.Debug("Updating queuePosition for remaining queued backups")
|
||||
queuedBackups, err := r.orderedQueuedBackups(ctx, backup)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error listing queued backups")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
queuedBackups := lister.orderedQueued()
|
||||
newQueuePos := 1
|
||||
for _, queuedBackup := range queuedBackups {
|
||||
if queuedBackup.Name != backup.Name {
|
||||
@@ -366,3 +317,65 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// queuedBackupsLister manages a list of all backups Queued, ReadyToStart, or InProgress
|
||||
// with methods to return specific subsets as needed
|
||||
type queuedBackupsLister struct {
|
||||
backups *velerov1api.BackupList
|
||||
}
|
||||
|
||||
func (r *backupQueueReconciler) newQueuedBackupsLister(ctx context.Context, ns string) (*queuedBackupsLister, error) {
|
||||
backupList := &velerov1api.BackupList{}
|
||||
backups := []velerov1api.Backup{}
|
||||
if err := r.Client.List(ctx, backupList, &client.ListOptions{Namespace: ns}); err != nil {
|
||||
r.logger.WithError(err).Error("error listing backups")
|
||||
return nil, err
|
||||
}
|
||||
for _, backup := range backupList.Items {
|
||||
if backup.Status.Phase == velerov1api.BackupPhaseQueued ||
|
||||
backup.Status.Phase == velerov1api.BackupPhaseInProgress ||
|
||||
backup.Status.Phase == velerov1api.BackupPhaseReadyToStart {
|
||||
backups = append(backups, backup)
|
||||
}
|
||||
}
|
||||
backupList.Items = backups
|
||||
return &queuedBackupsLister{backupList}, nil
|
||||
}
|
||||
|
||||
func (l *queuedBackupsLister) earlierThan(queuePos int) ([]velerov1api.Backup, int){
|
||||
backups := []velerov1api.Backup{}
|
||||
runningCount := 0
|
||||
for _, backup := range l.backups.Items {
|
||||
// InProgress and ReadyToStart backups have QueuePosition==0
|
||||
if backup.Status.QueuePosition < queuePos {
|
||||
backups = append(backups, backup)
|
||||
}
|
||||
// InProgress and ReadyToStart backups count towards the concurrentBackups limit
|
||||
if backup.Status.Phase == velerov1api.BackupPhaseInProgress ||
|
||||
backup.Status.Phase == velerov1api.BackupPhaseReadyToStart {
|
||||
runningCount++
|
||||
}
|
||||
}
|
||||
return backups, runningCount
|
||||
}
|
||||
|
||||
func (l *queuedBackupsLister) queued() []velerov1api.Backup {
|
||||
backups := []velerov1api.Backup{}
|
||||
for _, backup := range l.backups.Items {
|
||||
if backup.Status.Phase == velerov1api.BackupPhaseQueued {
|
||||
backups = append(backups, backup)
|
||||
}
|
||||
}
|
||||
return backups
|
||||
}
|
||||
|
||||
func (l *queuedBackupsLister) orderedQueued() []velerov1api.Backup {
|
||||
var returnList []velerov1api.Backup
|
||||
orderedBackupList := queuePositionOrderFunc(l.backups).(*velerov1api.BackupList)
|
||||
for _, item := range orderedBackupList.Items {
|
||||
if item.Status.Phase == velerov1api.BackupPhaseQueued {
|
||||
returnList = append(returnList, item)
|
||||
}
|
||||
}
|
||||
return returnList
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user