mirror of
https://github.com/vmware-tanzu/velero.git
synced 2025-12-23 14:25:22 +00:00
Track running backup count via BackupTracker
This avoids an unnecessary apiserver List call when the backup reconciler is already at capacity. Signed-off-by: Scott Seago <sseago@redhat.com>
This commit is contained in:
@@ -917,6 +917,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
|||||||
s.mgr.GetScheme(),
|
s.mgr.GetScheme(),
|
||||||
s.logger,
|
s.logger,
|
||||||
s.config.ConcurrentBackups,
|
s.config.ConcurrentBackups,
|
||||||
|
backupTracker,
|
||||||
).SetupWithManager(s.mgr); err != nil {
|
).SetupWithManager(s.mgr); err != nil {
|
||||||
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupQueue)
|
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupQueue)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -323,6 +323,8 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
|
|||||||
switch request.Status.Phase {
|
switch request.Status.Phase {
|
||||||
case velerov1api.BackupPhaseCompleted, velerov1api.BackupPhasePartiallyFailed, velerov1api.BackupPhaseFailed, velerov1api.BackupPhaseFailedValidation:
|
case velerov1api.BackupPhaseCompleted, velerov1api.BackupPhasePartiallyFailed, velerov1api.BackupPhaseFailed, velerov1api.BackupPhaseFailedValidation:
|
||||||
b.backupTracker.Delete(request.Namespace, request.Name)
|
b.backupTracker.Delete(request.Namespace, request.Name)
|
||||||
|
case velerov1api.BackupPhaseWaitingForPluginOperations, velerov1api.BackupPhaseWaitingForPluginOperationsPartiallyFailed, velerov1api.BackupPhaseFinalizing, velerov1api.BackupPhaseFinalizingPartiallyFailed:
|
||||||
|
b.backupTracker.AddPostProcessing(request.Namespace, request.Name)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ type backupQueueReconciler struct {
|
|||||||
Scheme *runtime.Scheme
|
Scheme *runtime.Scheme
|
||||||
logger logrus.FieldLogger
|
logger logrus.FieldLogger
|
||||||
concurrentBackups int
|
concurrentBackups int
|
||||||
|
backupTracker BackupTracker
|
||||||
frequency time.Duration
|
frequency time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,12 +61,14 @@ func NewBackupQueueReconciler(
|
|||||||
scheme *runtime.Scheme,
|
scheme *runtime.Scheme,
|
||||||
logger logrus.FieldLogger,
|
logger logrus.FieldLogger,
|
||||||
concurrentBackups int,
|
concurrentBackups int,
|
||||||
|
backupTracker BackupTracker,
|
||||||
) *backupQueueReconciler {
|
) *backupQueueReconciler {
|
||||||
return &backupQueueReconciler{
|
return &backupQueueReconciler{
|
||||||
Client: client,
|
Client: client,
|
||||||
Scheme: scheme,
|
Scheme: scheme,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
concurrentBackups: max(concurrentBackups, 1),
|
concurrentBackups: max(concurrentBackups, 1),
|
||||||
|
backupTracker: backupTracker,
|
||||||
frequency: defaultQueuedBackupRecheckFrequency,
|
frequency: defaultQueuedBackupRecheckFrequency,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -261,11 +264,11 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||||||
return ctrl.Result{}, nil
|
return ctrl.Result{}, nil
|
||||||
}
|
}
|
||||||
lister := r.newQueuedBackupsLister(allBackups)
|
lister := r.newQueuedBackupsLister(allBackups)
|
||||||
earlierBackups, runningCount := lister.earlierThan(backup.Status.QueuePosition)
|
if r.backupTracker.RunningCount() >= r.concurrentBackups {
|
||||||
if runningCount >= r.concurrentBackups {
|
|
||||||
log.Debugf("%v concurrent backups are already running, leaving %v queued", r.concurrentBackups, backup.Name)
|
log.Debugf("%v concurrent backups are already running, leaving %v queued", r.concurrentBackups, backup.Name)
|
||||||
return ctrl.Result{}, nil
|
return ctrl.Result{}, nil
|
||||||
}
|
}
|
||||||
|
earlierBackups := lister.earlierThan(backup.Status.QueuePosition)
|
||||||
foundConflict, conflictBackup, clusterNamespaces, err := r.detectNamespaceConflict(ctx, backup, earlierBackups)
|
foundConflict, conflictBackup, clusterNamespaces, err := r.detectNamespaceConflict(ctx, backup, earlierBackups)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("error listing namespaces")
|
log.WithError(err).Error("error listing namespaces")
|
||||||
@@ -287,6 +290,7 @@ func (r *backupQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
|
|||||||
if err := kube.PatchResource(original, backup, r.Client); err != nil {
|
if err := kube.PatchResource(original, backup, r.Client); err != nil {
|
||||||
return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase)
|
return ctrl.Result{}, errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase)
|
||||||
}
|
}
|
||||||
|
r.backupTracker.AddReadyToStart(backup.Namespace, backup.Name)
|
||||||
log.Debug("Updating queuePosition for remaining queued backups")
|
log.Debug("Updating queuePosition for remaining queued backups")
|
||||||
queuedBackups := lister.orderedQueued()
|
queuedBackups := lister.orderedQueued()
|
||||||
newQueuePos := 1
|
newQueuePos := 1
|
||||||
@@ -329,21 +333,15 @@ func (r *backupQueueReconciler) newQueuedBackupsLister(backupList *velerov1api.B
|
|||||||
return &queuedBackupsLister{backupList}
|
return &queuedBackupsLister{backupList}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *queuedBackupsLister) earlierThan(queuePos int) ([]velerov1api.Backup, int) {
|
func (l *queuedBackupsLister) earlierThan(queuePos int) []velerov1api.Backup {
|
||||||
backups := []velerov1api.Backup{}
|
backups := []velerov1api.Backup{}
|
||||||
runningCount := 0
|
|
||||||
for _, backup := range l.backups.Items {
|
for _, backup := range l.backups.Items {
|
||||||
// InProgress and ReadyToStart backups have QueuePosition==0
|
// InProgress and ReadyToStart backups have QueuePosition==0
|
||||||
if backup.Status.QueuePosition < queuePos {
|
if backup.Status.QueuePosition < queuePos {
|
||||||
backups = append(backups, backup)
|
backups = append(backups, backup)
|
||||||
}
|
}
|
||||||
// InProgress and ReadyToStart backups count towards the concurrentBackups limit
|
|
||||||
if backup.Status.Phase == velerov1api.BackupPhaseInProgress ||
|
|
||||||
backup.Status.Phase == velerov1api.BackupPhaseReadyToStart {
|
|
||||||
runningCount++
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return backups, runningCount
|
return backups
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *queuedBackupsLister) orderedQueued() []velerov1api.Backup {
|
func (l *queuedBackupsLister) orderedQueued() []velerov1api.Backup {
|
||||||
|
|||||||
@@ -202,9 +202,15 @@ func TestBackupQueueReconciler(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
backupTracker := NewBackupTracker()
|
||||||
initObjs := []runtime.Object{}
|
initObjs := []runtime.Object{}
|
||||||
for _, priorBackup := range test.priorBackups {
|
for _, priorBackup := range test.priorBackups {
|
||||||
initObjs = append(initObjs, priorBackup)
|
initObjs = append(initObjs, priorBackup)
|
||||||
|
if priorBackup.Status.Phase == velerov1api.BackupPhaseReadyToStart {
|
||||||
|
backupTracker.AddReadyToStart(priorBackup.Namespace, priorBackup.Name)
|
||||||
|
} else if priorBackup.Status.Phase == velerov1api.BackupPhaseInProgress {
|
||||||
|
backupTracker.Add(priorBackup.Namespace, priorBackup.Name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, ns := range test.namespaces {
|
for _, ns := range test.namespaces {
|
||||||
initObjs = append(initObjs, builder.ForNamespace(ns).Result())
|
initObjs = append(initObjs, builder.ForNamespace(ns).Result())
|
||||||
@@ -214,7 +220,7 @@ func TestBackupQueueReconciler(t *testing.T) {
|
|||||||
fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...)
|
fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...)
|
||||||
logger := logrus.New()
|
logger := logrus.New()
|
||||||
log := logger.WithField("controller", "backup-queue-test")
|
log := logger.WithField("controller", "backup-queue-test")
|
||||||
r := NewBackupQueueReconciler(fakeClient, scheme, log, test.concurrentBackups)
|
r := NewBackupQueueReconciler(fakeClient, scheme, log, test.concurrentBackups, backupTracker)
|
||||||
req := ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}}
|
req := ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}}
|
||||||
res, err := r.Reconcile(t.Context(), req)
|
res, err := r.Reconcile(t.Context(), req)
|
||||||
gotErr := err != nil
|
gotErr := err != nil
|
||||||
|
|||||||
@@ -25,45 +25,90 @@ import (
|
|||||||
|
|
||||||
// BackupTracker keeps track of in-progress backups.
|
// BackupTracker keeps track of in-progress backups.
|
||||||
type BackupTracker interface {
|
type BackupTracker interface {
|
||||||
|
// Add informs the tracker that a backup is ReadyToStart.
|
||||||
|
AddReadyToStart(ns, name string)
|
||||||
// Add informs the tracker that a backup is in progress.
|
// Add informs the tracker that a backup is in progress.
|
||||||
Add(ns, name string)
|
Add(ns, name string)
|
||||||
// Delete informs the tracker that a backup is no longer in progress.
|
// Add informs the tracker that a backup has moved beyond InProgress
|
||||||
|
AddPostProcessing(ns, name string)
|
||||||
|
// Delete informs the tracker that a backup has reached a terminal state.
|
||||||
Delete(ns, name string)
|
Delete(ns, name string)
|
||||||
// Contains returns true if the tracker is tracking the backup.
|
// Contains returns true if backup is InProgress or post-InProgress
|
||||||
Contains(ns, name string) bool
|
Contains(ns, name string) bool
|
||||||
|
// RunningCount returns the number of backups which are ReadyToStart or InProgress
|
||||||
|
RunningCount() int
|
||||||
}
|
}
|
||||||
|
|
||||||
type backupTracker struct {
|
type backupTracker struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
backups sets.Set[string]
|
readyToStartBackups sets.Set[string]
|
||||||
|
inProgressBackups sets.Set[string]
|
||||||
|
postProgressBackups sets.Set[string]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBackupTracker returns a new BackupTracker.
|
// NewBackupTracker returns a new BackupTracker.
|
||||||
func NewBackupTracker() BackupTracker {
|
func NewBackupTracker() BackupTracker {
|
||||||
return &backupTracker{
|
return &backupTracker{
|
||||||
backups: sets.New[string](),
|
readyToStartBackups: sets.New[string](),
|
||||||
|
inProgressBackups: sets.New[string](),
|
||||||
|
postProgressBackups: sets.New[string](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bt *backupTracker) AddReadyToStart(ns, name string) {
|
||||||
|
bt.lock.Lock()
|
||||||
|
defer bt.lock.Unlock()
|
||||||
|
|
||||||
|
bt.readyToStartBackups.Insert(backupTrackerKey(ns, name))
|
||||||
|
}
|
||||||
|
|
||||||
func (bt *backupTracker) Add(ns, name string) {
|
func (bt *backupTracker) Add(ns, name string) {
|
||||||
bt.lock.Lock()
|
bt.lock.Lock()
|
||||||
defer bt.lock.Unlock()
|
defer bt.lock.Unlock()
|
||||||
|
|
||||||
bt.backups.Insert(backupTrackerKey(ns, name))
|
key := backupTrackerKey(ns, name)
|
||||||
|
bt.readyToStartBackups.Delete(key)
|
||||||
|
bt.inProgressBackups.Insert(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt *backupTracker) AddPostProcessing(ns, name string) {
|
||||||
|
bt.lock.Lock()
|
||||||
|
defer bt.lock.Unlock()
|
||||||
|
|
||||||
|
key := backupTrackerKey(ns, name)
|
||||||
|
bt.readyToStartBackups.Delete(key)
|
||||||
|
bt.inProgressBackups.Delete(key)
|
||||||
|
bt.postProgressBackups.Insert(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bt *backupTracker) Delete(ns, name string) {
|
func (bt *backupTracker) Delete(ns, name string) {
|
||||||
bt.lock.Lock()
|
bt.lock.Lock()
|
||||||
defer bt.lock.Unlock()
|
defer bt.lock.Unlock()
|
||||||
|
|
||||||
bt.backups.Delete(backupTrackerKey(ns, name))
|
key := backupTrackerKey(ns, name)
|
||||||
|
bt.readyToStartBackups.Delete(key)
|
||||||
|
bt.inProgressBackups.Delete(key)
|
||||||
|
bt.postProgressBackups.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Contains returns true if backup is InProgress or post-InProgress
|
||||||
|
// ignores ReadyToStart, since this is used to determine whether
|
||||||
|
// a backup is in progress and thus not able to be deleted now.
|
||||||
func (bt *backupTracker) Contains(ns, name string) bool {
|
func (bt *backupTracker) Contains(ns, name string) bool {
|
||||||
bt.lock.RLock()
|
bt.lock.RLock()
|
||||||
defer bt.lock.RUnlock()
|
defer bt.lock.RUnlock()
|
||||||
|
|
||||||
return bt.backups.Has(backupTrackerKey(ns, name))
|
key := backupTrackerKey(ns, name)
|
||||||
|
return bt.inProgressBackups.Has(key) || bt.postProgressBackups.Has(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunningCount returns the number of backups which are ReadyToStart or InProgress
|
||||||
|
// used by queue controller to determine whether a new backup can be started.
|
||||||
|
func (bt *backupTracker) RunningCount() int {
|
||||||
|
bt.lock.RLock()
|
||||||
|
defer bt.lock.RUnlock()
|
||||||
|
|
||||||
|
return bt.inProgressBackups.Len() + bt.readyToStartBackups.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
func backupTrackerKey(ns, name string) string {
|
func backupTrackerKey(ns, name string) string {
|
||||||
|
|||||||
Reference in New Issue
Block a user