diff --git a/changelogs/unreleased/5900-blackpiglet b/changelogs/unreleased/5900-blackpiglet new file mode 100644 index 000000000..229b6a19d --- /dev/null +++ b/changelogs/unreleased/5900-blackpiglet @@ -0,0 +1 @@ +Limit the concurrent number for backup's VolumeSnapshot operation. \ No newline at end of file diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 4cc98b98c..b88bc352d 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -109,6 +109,8 @@ const ( // defaultCredentialsDirectory is the path on disk where credential // files will be written to defaultCredentialsDirectory = "/tmp/credentials" + + defaultMaxConcurrentK8SConnections = 30 ) type serverConfig struct { @@ -131,6 +133,7 @@ type serverConfig struct { itemOperationSyncFrequency time.Duration defaultVolumesToFsBackup bool uploaderType string + maxConcurrentK8SConnections int } type controllerRunInfo struct { @@ -163,6 +166,7 @@ func NewCommand(f client.Factory) *cobra.Command { formatFlag: logging.NewFormatFlag(), defaultVolumesToFsBackup: podvolume.DefaultVolumesToFsBackup, uploaderType: uploader.ResticType, + maxConcurrentK8SConnections: defaultMaxConcurrentK8SConnections, } ) @@ -232,6 +236,7 @@ func NewCommand(f client.Factory) *cobra.Command { command.Flags().StringVar(&config.uploaderType, "uploader-type", config.uploaderType, "Type of uploader to handle the transfer of data of pod volumes") command.Flags().DurationVar(&config.defaultItemOperationTimeout, "default-item-operation-timeout", config.defaultItemOperationTimeout, "How long to wait on asynchronous BackupItemActions and RestoreItemActions to complete before timing out.") command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") + command.Flags().IntVar(&config.maxConcurrentK8SConnections, "max-concurrent-k8s-connections", config.maxConcurrentK8SConnections, "Max concurrent connections number that Velero can create with kube-apiserver. Default is 30.") return command } @@ -735,6 +740,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.csiSnapshotLister, s.csiSnapshotClient, s.credentialFileStore, + s.config.maxConcurrentK8SConnections, ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.Backup) } diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index cc532683c..1f8d99619 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -91,6 +91,7 @@ type backupReconciler struct { volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister volumeSnapshotClient snapshotterClientSet.Interface credentialFileStore credentials.FileStore + maxConcurrentK8SConnections int } func NewBackupReconciler( @@ -115,6 +116,7 @@ func NewBackupReconciler( volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, volumeSnapshotClient snapshotterClientSet.Interface, credentialStore credentials.FileStore, + maxConcurrentK8SConnections int, ) *backupReconciler { b := &backupReconciler{ @@ -140,6 +142,7 @@ func NewBackupReconciler( volumeSnapshotLister: volumeSnapshotLister, volumeSnapshotClient: volumeSnapshotClient, credentialFileStore: credentialStore, + maxConcurrentK8SConnections: maxConcurrentK8SConnections, } b.updateTotalBackupMetric() return b @@ -684,7 +687,7 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error { // Delete the VolumeSnapshots created in the backup, when CSI feature is enabled. if len(volumeSnapshots) > 0 && len(volumeSnapshotContents) > 0 { - b.deleteVolumeSnapshot(volumeSnapshots, volumeSnapshotContents, backupLog) + b.deleteVolumeSnapshots(volumeSnapshots, volumeSnapshotContents, backupLog, b.maxConcurrentK8SConnections) } } @@ -986,77 +989,100 @@ func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context, return result, err } -// deleteVolumeSnapshot delete VolumeSnapshot created during backup. +// deleteVolumeSnapshots delete VolumeSnapshot created during backup. // This is used to avoid deleting namespace in cluster triggers the VolumeSnapshot deletion, // which will cause snapshot deletion on cloud provider, then backup cannot restore the PV. // If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to // change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete. -func (b *backupReconciler) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.VolumeSnapshot, +func (b *backupReconciler) deleteVolumeSnapshots(volumeSnapshots []snapshotv1api.VolumeSnapshot, volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent, - logger logrus.FieldLogger) { + logger logrus.FieldLogger, maxConcurrent int) { var wg sync.WaitGroup vscMap := make(map[string]snapshotv1api.VolumeSnapshotContent) for _, vsc := range volumeSnapshotContents { vscMap[vsc.Name] = vsc } - for _, vs := range volumeSnapshots { - wg.Add(1) - go func(vs snapshotv1api.VolumeSnapshot) { - defer wg.Done() - var vsc snapshotv1api.VolumeSnapshotContent - modifyVSCFlag := false - if vs.Status != nil && - vs.Status.BoundVolumeSnapshotContentName != nil && - len(*vs.Status.BoundVolumeSnapshotContentName) > 0 { - var found bool - if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found { - logger.Errorf("Not find %s from the vscMap", *vs.Status.BoundVolumeSnapshotContentName) + ch := make(chan snapshotv1api.VolumeSnapshot, maxConcurrent) + defer func() { + if _, ok := <-ch; ok { + close(ch) + } + }() + + wg.Add(maxConcurrent) + for i := 0; i < maxConcurrent; i++ { + go func() { + for { + vs, ok := <-ch + if !ok { + wg.Done() return } - - if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete { - modifyVSCFlag = true - } - } else { - logger.Errorf("VolumeSnapshot %s/%s is not ready. This is not expected.", vs.Namespace, vs.Name) + b.deleteVolumeSnapshot(vs, vscMap, logger) } - - // Change VolumeSnapshotContent's DeletionPolicy to Retain before deleting VolumeSnapshot, - // because VolumeSnapshotContent will be deleted by deleting VolumeSnapshot, when - // DeletionPolicy is set to Delete, but Velero needs VSC for cleaning snapshot on cloud - // in backup deletion. - if modifyVSCFlag { - logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name) - original := vsc.DeepCopy() - vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain - err := kubeutil.PatchResource(original, &vsc, b.kbClient) - if err != nil { - logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error()) - return - } - - defer func() { - logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name) - err := b.recreateVolumeSnapshotContent(vsc) - if err != nil { - logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error()) - } - }() - } - - // Delete VolumeSnapshot from cluster - logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name) - err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(b.ctx, vs.Name, metav1.DeleteOptions{}) - if err != nil { - logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error()) - } - }(vs) + }() } + for _, vs := range volumeSnapshots { + ch <- vs + } + close(ch) + wg.Wait() } +// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot +// instance. +func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vscMap map[string]snapshotv1api.VolumeSnapshotContent, logger logrus.FieldLogger) { + var vsc snapshotv1api.VolumeSnapshotContent + modifyVSCFlag := false + if vs.Status != nil && + vs.Status.BoundVolumeSnapshotContentName != nil && + len(*vs.Status.BoundVolumeSnapshotContentName) > 0 { + var found bool + if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found { + logger.Errorf("Not find %s from the vscMap", *vs.Status.BoundVolumeSnapshotContentName) + return + } + + if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete { + modifyVSCFlag = true + } + } else { + logger.Errorf("VolumeSnapshot %s/%s is not ready. This is not expected.", vs.Namespace, vs.Name) + } + + // Change VolumeSnapshotContent's DeletionPolicy to Retain before deleting VolumeSnapshot, + // because VolumeSnapshotContent will be deleted by deleting VolumeSnapshot, when + // DeletionPolicy is set to Delete, but Velero needs VSC for cleaning snapshot on cloud + // in backup deletion. + if modifyVSCFlag { + logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name) + original := vsc.DeepCopy() + vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain + if err := b.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil { + logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error()) + return + } + + defer func() { + logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name) + err := b.recreateVolumeSnapshotContent(vsc) + if err != nil { + logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error()) + } + }() + } + + // Delete VolumeSnapshot from cluster + logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name) + err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{}) + if err != nil { + logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error()) + } +} + // recreateVolumeSnapshotContent will delete then re-create VolumeSnapshotContent, // because some parameter in VolumeSnapshotContent Spec is immutable, e.g. VolumeSnapshotRef // and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS. diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index e171ddf06..48e34663c 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -1374,12 +1374,12 @@ func Test_getLastSuccessBySchedule(t *testing.T) { } } -func TestDeleteVolumeSnapshot(t *testing.T) { +func TestDeleteVolumeSnapshots(t *testing.T) { tests := []struct { name string vsArray []snapshotv1api.VolumeSnapshot vscArray []snapshotv1api.VolumeSnapshotContent - expectedVSArray []*snapshotv1api.VolumeSnapshot + expectedVSArray []snapshotv1api.VolumeSnapshot expectedVSCArray []snapshotv1api.VolumeSnapshotContent }{ { @@ -1390,19 +1390,38 @@ func TestDeleteVolumeSnapshot(t *testing.T) { vscArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), }, - expectedVSArray: []*snapshotv1api.VolumeSnapshot{}, + expectedVSArray: []snapshotv1api.VolumeSnapshot{}, expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(), }, }, + { + name: "VS is ReadyToUse, and VS has corresponding VSC. Concurrent test.", + vsArray: []snapshotv1api.VolumeSnapshot{ + *builder.ForVolumeSnapshot("velero", "vs1").ObjectMeta(builder.WithLabels("testing-vs", "vs1")).Status().BoundVolumeSnapshotContentName("vsc1").Result(), + *builder.ForVolumeSnapshot("velero", "vs2").ObjectMeta(builder.WithLabels("testing-vs", "vs2")).Status().BoundVolumeSnapshotContentName("vsc2").Result(), + *builder.ForVolumeSnapshot("velero", "vs3").ObjectMeta(builder.WithLabels("testing-vs", "vs3")).Status().BoundVolumeSnapshotContentName("vsc3").Result(), + }, + vscArray: []snapshotv1api.VolumeSnapshotContent{ + *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), + *builder.ForVolumeSnapshotContent("vsc2").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), + *builder.ForVolumeSnapshotContent("vsc3").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), + }, + expectedVSArray: []snapshotv1api.VolumeSnapshot{}, + expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{ + *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(), + *builder.ForVolumeSnapshotContent("vsc2").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(), + *builder.ForVolumeSnapshotContent("vsc3").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(), + }, + }, { name: "Corresponding VSC not found for VS. VS is not deleted.", vsArray: []snapshotv1api.VolumeSnapshot{ *builder.ForVolumeSnapshot("velero", "vs1").ObjectMeta(builder.WithLabels("testing-vs", "vs1")).Status().BoundVolumeSnapshotContentName("vsc1").Result(), }, vscArray: []snapshotv1api.VolumeSnapshotContent{}, - expectedVSArray: []*snapshotv1api.VolumeSnapshot{ - builder.ForVolumeSnapshot("velero", "vs1").Status().BoundVolumeSnapshotContentName("vsc1").Result(), + expectedVSArray: []snapshotv1api.VolumeSnapshot{ + *builder.ForVolumeSnapshot("velero", "vs1").Status().BoundVolumeSnapshotContentName("vsc1").Result(), }, expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{}, }, @@ -1414,7 +1433,7 @@ func TestDeleteVolumeSnapshot(t *testing.T) { vscArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), }, - expectedVSArray: []*snapshotv1api.VolumeSnapshot{}, + expectedVSArray: []snapshotv1api.VolumeSnapshot{}, expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{ *builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(), }, @@ -1439,11 +1458,13 @@ func TestDeleteVolumeSnapshot(t *testing.T) { for _, vs := range tc.vsArray { _, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Create(context.Background(), &vs, metav1.CreateOptions{}) require.NoError(t, err) + require.NoError(t, sharedInformers.Snapshot().V1().VolumeSnapshots().Informer().GetStore().Add(&vs)) } logger := logging.DefaultLogger(logrus.DebugLevel, logging.FormatText) - c.deleteVolumeSnapshot(tc.vsArray, tc.vscArray, logger) - vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.Background(), metav1.ListOptions{}) + c.deleteVolumeSnapshots(tc.vsArray, tc.vscArray, logger, 30) + + vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.TODO(), metav1.ListOptions{}) require.NoError(t, err) assert.Equal(t, len(tc.expectedVSArray), len(vsList.Items)) for index := range tc.expectedVSArray {