Move worker pool creation to backup reconcile.

ItemBlockWorkerPool is now created for each backup.

Signed-off-by: Scott Seago <sseago@redhat.com>
This commit is contained in:
Scott Seago
2025-09-29 16:20:43 -04:00
parent e0c08f03cf
commit 91357b28c4
5 changed files with 52 additions and 65 deletions

View File

@@ -597,7 +597,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
wg.Add(1)
backupRequest.ItemBlockChannel <- ItemBlockInput{
backupRequest.WorkerPool.GetInputChannel() <- ItemBlockInput{
itemBlock: itemBlock,
returnChan: itemBlockReturn,
}

View File

@@ -79,7 +79,7 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
WorkerPool: &h.itemBlockPool,
}
backupFile := bytes.NewBuffer([]byte{})
@@ -141,7 +141,7 @@ func TestBackupProgressIsUpdated(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
WorkerPool: &h.itemBlockPool,
}
backupFile := bytes.NewBuffer([]byte{})
@@ -881,7 +881,7 @@ func TestBackupOldResourceFiltering(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1062,7 +1062,7 @@ func TestCRDInclusion(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1161,7 +1161,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1190,7 +1190,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
WorkerPool: &h.itemBlockPool,
}
backup1File := bytes.NewBuffer([]byte{})
@@ -1206,7 +1206,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: h.itemBlockPool.GetInputChannel(),
WorkerPool: &h.itemBlockPool,
}
backup2File := bytes.NewBuffer([]byte{})
@@ -1260,7 +1260,7 @@ func TestBackupResourceOrdering(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1381,7 +1381,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
Backup: defaultBackup().SnapshotVolumes(false).Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
resPolicies: &resourcepolicies.ResourcePolicies{
Version: "v1",
@@ -1429,7 +1429,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) {
includedPVs: map[string]struct{}{},
},
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVCs(
@@ -1679,7 +1679,7 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1764,7 +1764,7 @@ func TestBackupWithInvalidActions(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1918,7 +1918,7 @@ func TestBackupActionModifications(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2178,7 +2178,7 @@ func TestBackupActionAdditionalItems(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2439,7 +2439,7 @@ func TestItemBlockActionsRunForCorrectItems(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2524,7 +2524,7 @@ func TestBackupWithInvalidItemBlockActions(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2780,7 +2780,7 @@ func TestItemBlockActionRelatedItems(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2948,7 +2948,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2984,7 +2984,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3021,7 +3021,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3058,7 +3058,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3095,7 +3095,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3130,7 +3130,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3148,7 +3148,7 @@ func TestBackupWithSnapshots(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3169,7 +3169,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3188,7 +3188,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3210,7 +3210,7 @@ func TestBackupWithSnapshots(t *testing.T) {
},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.PVs(
@@ -3344,7 +3344,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.Pods(
@@ -3376,7 +3376,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.Pods(
@@ -3408,7 +3408,7 @@ func TestBackupWithAsyncOperations(t *testing.T) {
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
},
apiResources: []*test.APIResource{
test.Pods(
@@ -3494,7 +3494,7 @@ func TestBackupWithInvalidHooks(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -3968,7 +3968,7 @@ func TestBackupWithHooks(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
podCommandExecutor = new(test.MockPodCommandExecutor)
@@ -4193,7 +4193,7 @@ func TestBackupWithPodVolume(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl},
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -5312,7 +5312,7 @@ func TestBackupNewResourceFiltering(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -5477,7 +5477,7 @@ func TestBackupNamespaces(t *testing.T) {
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
BackedUpItems: NewBackedUpItemsMap(),
ItemBlockChannel: itemBlockPool.GetInputChannel(),
WorkerPool: itemBlockPool,
}
backupFile = bytes.NewBuffer([]byte{})
)

View File

@@ -69,7 +69,7 @@ type Request struct {
ResPolicies *resourcepolicies.Policies
SkippedPVTracker *skipPVTracker
VolumesInformation volume.BackupVolumesInformation
ItemBlockChannel chan ItemBlockInput
WorkerPool *ItemBlockWorkerPool
}
// BackupVolumesInformation contains the information needs by generating

View File

@@ -110,7 +110,6 @@ type backupReconciler struct {
globalCRClient kbclient.Client
itemBlockWorkerCount int
concurrentBackups int
workerPool *pkgbackup.ItemBlockWorkerPool
}
func NewBackupReconciler(
@@ -167,7 +166,6 @@ func NewBackupReconciler(
itemBlockWorkerCount: itemBlockWorkerCount,
concurrentBackups: max(concurrentBackups, 1),
globalCRClient: globalCRClient,
workerPool: pkgbackup.StartItemBlockWorkerPool(ctx, itemBlockWorkerCount, logger),
}
b.updateTotalBackupMetric()
return b
@@ -289,7 +287,9 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
log.Debug("Preparing backup request")
request := b.prepareBackupRequest(original, log)
request := b.prepareBackupRequest(ctx, original, log)
// delete worker pool after reconcile
defer request.WorkerPool.Stop()
if len(request.Status.ValidationErrors) > 0 {
request.Status.Phase = velerov1api.BackupPhaseFailedValidation
} else {
@@ -371,12 +371,12 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, nil
}
func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request {
func (b *backupReconciler) prepareBackupRequest(ctx context.Context, backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request {
request := &pkgbackup.Request{
Backup: backup.DeepCopy(), // don't modify items in the cache
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
BackedUpItems: pkgbackup.NewBackedUpItemsMap(),
ItemBlockChannel: b.workerPool.GetInputChannel(),
WorkerPool: pkgbackup.StartItemBlockWorkerPool(ctx, b.itemBlockWorkerCount, logger),
}
request.VolumesInformation.Init()

View File

@@ -149,9 +149,7 @@ func TestProcessBackupNonProcessedItems(t *testing.T) {
kbClient: velerotest.NewFakeControllerRuntimeClient(t),
formatFlag: formatFlag,
logger: logger,
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
if test.backup != nil {
require.NoError(t, c.kbClient.Create(t.Context(), test.backup))
}
@@ -248,9 +246,7 @@ func TestProcessBackupValidationFailures(t *testing.T) {
clock: &clock.RealClock{},
formatFlag: formatFlag,
metrics: metrics.NewServerMetrics(),
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
require.NotNil(t, test.backup)
require.NoError(t, c.kbClient.Create(t.Context(), test.backup))
@@ -313,11 +309,10 @@ func TestBackupLocationLabel(t *testing.T) {
defaultBackupLocation: test.backupLocation.Name,
clock: &clock.RealClock{},
formatFlag: formatFlag,
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger)
res := c.prepareBackupRequest(ctx, test.backup, logger)
defer res.WorkerPool.Stop()
assert.NotNil(t, res)
assert.Equal(t, test.expectedBackupLocation, res.Labels[velerov1api.StorageLocationLabel])
})
@@ -410,14 +405,13 @@ func Test_prepareBackupRequest_BackupStorageLocation(t *testing.T) {
defaultBackupTTL: defaultBackupTTL.Duration,
clock: testclocks.NewFakeClock(now),
formatFlag: formatFlag,
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
test.backup.Spec.StorageLocation = test.backupLocationNameInBackup
// Run
res := c.prepareBackupRequest(test.backup, logger)
res := c.prepareBackupRequest(ctx, test.backup, logger)
defer res.WorkerPool.Stop()
// Assert
if test.expectedSuccess {
@@ -486,11 +480,10 @@ func TestDefaultBackupTTL(t *testing.T) {
defaultBackupTTL: defaultBackupTTL.Duration,
clock: testclocks.NewFakeClock(now),
formatFlag: formatFlag,
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger)
res := c.prepareBackupRequest(ctx, test.backup, logger)
defer res.WorkerPool.Stop()
assert.NotNil(t, res)
assert.Equal(t, test.expectedTTL, res.Spec.TTL)
assert.Equal(t, test.expectedExpiration, *res.Status.Expiration)
@@ -547,11 +540,10 @@ func TestPrepareBackupRequest_SetsVGSLabelKey(t *testing.T) {
defaultVGSLabelKey: test.serverFlagKey,
discoveryHelper: discoveryHelper,
clock: testclocks.NewFakeClock(now),
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger)
res := c.prepareBackupRequest(ctx, test.backup, logger)
defer res.WorkerPool.Stop()
assert.NotNil(t, res)
assert.Equal(t, test.expectedLabelKey, res.Spec.VolumeGroupSnapshotLabelKey)
@@ -649,11 +641,10 @@ func TestDefaultVolumesToResticDeprecation(t *testing.T) {
clock: &clock.RealClock{},
formatFlag: formatFlag,
defaultVolumesToFsBackup: test.globalVal,
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
res := c.prepareBackupRequest(test.backup, logger)
res := c.prepareBackupRequest(ctx, test.backup, logger)
defer res.WorkerPool.Stop()
assert.NotNil(t, res)
assert.NotNil(t, res.Spec.DefaultVolumesToFsBackup)
if test.expectRemap {
@@ -1569,9 +1560,7 @@ func TestProcessBackupCompletions(t *testing.T) {
backupper: backupper,
formatFlag: formatFlag,
globalCRClient: fakeGlobalClient,
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
pluginManager.On("GetBackupItemActionsV2").Return(nil, nil)
pluginManager.On("GetItemBlockActions").Return(nil, nil)
@@ -1777,9 +1766,7 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) {
logger: logger,
defaultSnapshotLocations: test.defaultLocations,
kbClient: velerotest.NewFakeControllerRuntimeClient(t),
workerPool: pkgbackup.StartItemBlockWorkerPool(t.Context(), 1, logger),
}
defer c.workerPool.Stop()
// set up a Backup object to represent what we expect to be passed to backupper.Backup()
backup := test.backup.DeepCopy()