From 91357b28c4d3cdcb6af0c6ac841b5a867b705c92 Mon Sep 17 00:00:00 2001 From: Scott Seago Date: Mon, 29 Sep 2025 16:20:43 -0400 Subject: [PATCH] Move worker pool creation to backup reconcile. ItemBlockWorkerPool is now created for each backup. Signed-off-by: Scott Seago --- pkg/backup/backup.go | 2 +- pkg/backup/backup_test.go | 70 ++++++++++++------------ pkg/backup/request.go | 2 +- pkg/controller/backup_controller.go | 10 ++-- pkg/controller/backup_controller_test.go | 33 ++++------- 5 files changed, 52 insertions(+), 65 deletions(-) diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index b0d0937dd..8d009de90 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -597,7 +597,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items)) wg.Add(1) - backupRequest.ItemBlockChannel <- ItemBlockInput{ + backupRequest.WorkerPool.GetInputChannel() <- ItemBlockInput{ itemBlock: itemBlock, returnChan: itemBlockReturn, } diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 6918c99f5..1db3cff6b 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -79,7 +79,7 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: h.itemBlockPool.GetInputChannel(), + WorkerPool: &h.itemBlockPool, } backupFile := bytes.NewBuffer([]byte{}) @@ -141,7 +141,7 @@ func TestBackupProgressIsUpdated(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: h.itemBlockPool.GetInputChannel(), + WorkerPool: &h.itemBlockPool, } backupFile := bytes.NewBuffer([]byte{}) @@ -881,7 +881,7 @@ func TestBackupOldResourceFiltering(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1062,7 +1062,7 @@ func TestCRDInclusion(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1161,7 +1161,7 @@ func TestBackupResourceCohabitation(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1190,7 +1190,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: h.itemBlockPool.GetInputChannel(), + WorkerPool: &h.itemBlockPool, } backup1File := bytes.NewBuffer([]byte{}) @@ -1206,7 +1206,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: h.itemBlockPool.GetInputChannel(), + WorkerPool: &h.itemBlockPool, } backup2File := bytes.NewBuffer([]byte{}) @@ -1260,7 +1260,7 @@ func TestBackupResourceOrdering(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1381,7 +1381,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) { Backup: defaultBackup().SnapshotVolumes(false).Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, resPolicies: &resourcepolicies.ResourcePolicies{ Version: "v1", @@ -1429,7 +1429,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) { includedPVs: map[string]struct{}{}, }, BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVCs( @@ -1679,7 +1679,7 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1764,7 +1764,7 @@ func TestBackupWithInvalidActions(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1918,7 +1918,7 @@ func TestBackupActionModifications(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2178,7 +2178,7 @@ func TestBackupActionAdditionalItems(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2439,7 +2439,7 @@ func TestItemBlockActionsRunForCorrectItems(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2524,7 +2524,7 @@ func TestBackupWithInvalidItemBlockActions(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2780,7 +2780,7 @@ func TestItemBlockActionRelatedItems(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2948,7 +2948,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -2984,7 +2984,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3021,7 +3021,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3058,7 +3058,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3095,7 +3095,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3130,7 +3130,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3148,7 +3148,7 @@ func TestBackupWithSnapshots(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3169,7 +3169,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3188,7 +3188,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3210,7 +3210,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.PVs( @@ -3344,7 +3344,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.Pods( @@ -3376,7 +3376,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.Pods( @@ -3408,7 +3408,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, }, apiResources: []*test.APIResource{ test.Pods( @@ -3494,7 +3494,7 @@ func TestBackupWithInvalidHooks(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -3968,7 +3968,7 @@ func TestBackupWithHooks(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) podCommandExecutor = new(test.MockPodCommandExecutor) @@ -4193,7 +4193,7 @@ func TestBackupWithPodVolume(t *testing.T) { SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl}, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -5312,7 +5312,7 @@ func TestBackupNewResourceFiltering(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) @@ -5477,7 +5477,7 @@ func TestBackupNamespaces(t *testing.T) { Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), - ItemBlockChannel: itemBlockPool.GetInputChannel(), + WorkerPool: itemBlockPool, } backupFile = bytes.NewBuffer([]byte{}) ) diff --git a/pkg/backup/request.go b/pkg/backup/request.go index 4643142b1..b012992c0 100644 --- a/pkg/backup/request.go +++ b/pkg/backup/request.go @@ -69,7 +69,7 @@ type Request struct { ResPolicies *resourcepolicies.Policies SkippedPVTracker *skipPVTracker VolumesInformation volume.BackupVolumesInformation - ItemBlockChannel chan ItemBlockInput + WorkerPool *ItemBlockWorkerPool } // BackupVolumesInformation contains the information needs by generating diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 90085fa1f..1d3b9b9eb 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -110,7 +110,6 @@ type backupReconciler struct { globalCRClient kbclient.Client itemBlockWorkerCount int concurrentBackups int - workerPool *pkgbackup.ItemBlockWorkerPool } func NewBackupReconciler( @@ -167,7 +166,6 @@ func NewBackupReconciler( itemBlockWorkerCount: itemBlockWorkerCount, concurrentBackups: max(concurrentBackups, 1), globalCRClient: globalCRClient, - workerPool: pkgbackup.StartItemBlockWorkerPool(ctx, itemBlockWorkerCount, logger), } b.updateTotalBackupMetric() return b @@ -289,7 +287,9 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } log.Debug("Preparing backup request") - request := b.prepareBackupRequest(original, log) + request := b.prepareBackupRequest(ctx, original, log) + // delete worker pool after reconcile + defer request.WorkerPool.Stop() if len(request.Status.ValidationErrors) > 0 { request.Status.Phase = velerov1api.BackupPhaseFailedValidation } else { @@ -371,12 +371,12 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } -func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request { +func (b *backupReconciler) prepareBackupRequest(ctx context.Context, backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request { request := &pkgbackup.Request{ Backup: backup.DeepCopy(), // don't modify items in the cache SkippedPVTracker: pkgbackup.NewSkipPVTracker(), BackedUpItems: pkgbackup.NewBackedUpItemsMap(), - ItemBlockChannel: b.workerPool.GetInputChannel(), + WorkerPool: pkgbackup.StartItemBlockWorkerPool(ctx, b.itemBlockWorkerCount, logger), } request.VolumesInformation.Init() diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 524db4b24..e0b554ff1 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -149,9 +149,7 @@ func TestProcessBackupNonProcessedItems(t *testing.T) { kbClient: velerotest.NewFakeControllerRuntimeClient(t), formatFlag: formatFlag, logger: logger, - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() if test.backup != nil { require.NoError(t, c.kbClient.Create(t.Context(), test.backup)) } @@ -248,9 +246,7 @@ func TestProcessBackupValidationFailures(t *testing.T) { clock: &clock.RealClock{}, formatFlag: formatFlag, metrics: metrics.NewServerMetrics(), - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() require.NotNil(t, test.backup) require.NoError(t, c.kbClient.Create(t.Context(), test.backup)) @@ -313,11 +309,10 @@ func TestBackupLocationLabel(t *testing.T) { defaultBackupLocation: test.backupLocation.Name, clock: &clock.RealClock{}, formatFlag: formatFlag, - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() - res := c.prepareBackupRequest(test.backup, logger) + res := c.prepareBackupRequest(ctx, test.backup, logger) + defer res.WorkerPool.Stop() assert.NotNil(t, res) assert.Equal(t, test.expectedBackupLocation, res.Labels[velerov1api.StorageLocationLabel]) }) @@ -410,14 +405,13 @@ func Test_prepareBackupRequest_BackupStorageLocation(t *testing.T) { defaultBackupTTL: defaultBackupTTL.Duration, clock: testclocks.NewFakeClock(now), formatFlag: formatFlag, - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() test.backup.Spec.StorageLocation = test.backupLocationNameInBackup // Run - res := c.prepareBackupRequest(test.backup, logger) + res := c.prepareBackupRequest(ctx, test.backup, logger) + defer res.WorkerPool.Stop() // Assert if test.expectedSuccess { @@ -486,11 +480,10 @@ func TestDefaultBackupTTL(t *testing.T) { defaultBackupTTL: defaultBackupTTL.Duration, clock: testclocks.NewFakeClock(now), formatFlag: formatFlag, - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() - res := c.prepareBackupRequest(test.backup, logger) + res := c.prepareBackupRequest(ctx, test.backup, logger) + defer res.WorkerPool.Stop() assert.NotNil(t, res) assert.Equal(t, test.expectedTTL, res.Spec.TTL) assert.Equal(t, test.expectedExpiration, *res.Status.Expiration) @@ -547,11 +540,10 @@ func TestPrepareBackupRequest_SetsVGSLabelKey(t *testing.T) { defaultVGSLabelKey: test.serverFlagKey, discoveryHelper: discoveryHelper, clock: testclocks.NewFakeClock(now), - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() - res := c.prepareBackupRequest(test.backup, logger) + res := c.prepareBackupRequest(ctx, test.backup, logger) + defer res.WorkerPool.Stop() assert.NotNil(t, res) assert.Equal(t, test.expectedLabelKey, res.Spec.VolumeGroupSnapshotLabelKey) @@ -649,11 +641,10 @@ func TestDefaultVolumesToResticDeprecation(t *testing.T) { clock: &clock.RealClock{}, formatFlag: formatFlag, defaultVolumesToFsBackup: test.globalVal, - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() - res := c.prepareBackupRequest(test.backup, logger) + res := c.prepareBackupRequest(ctx, test.backup, logger) + defer res.WorkerPool.Stop() assert.NotNil(t, res) assert.NotNil(t, res.Spec.DefaultVolumesToFsBackup) if test.expectRemap { @@ -1569,9 +1560,7 @@ func TestProcessBackupCompletions(t *testing.T) { backupper: backupper, formatFlag: formatFlag, globalCRClient: fakeGlobalClient, - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() pluginManager.On("GetBackupItemActionsV2").Return(nil, nil) pluginManager.On("GetItemBlockActions").Return(nil, nil) @@ -1777,9 +1766,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) { logger: logger, defaultSnapshotLocations: test.defaultLocations, kbClient: velerotest.NewFakeControllerRuntimeClient(t), - workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger), } - defer c.workerPool.Stop() // set up a Backup object to represent what we expect to be passed to backupper.Backup() backup := test.backup.DeepCopy()