mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-07 05:46:37 +00:00
Merge pull request #5900 from blackpiglet/5416_limit_concurrent_goroutine
Limit the concurrent number for backup's VolumeSnapshot operation
This commit is contained in:
@@ -91,6 +91,7 @@ type backupReconciler struct {
|
||||
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
|
||||
volumeSnapshotClient snapshotterClientSet.Interface
|
||||
credentialFileStore credentials.FileStore
|
||||
maxConcurrentK8SConnections int
|
||||
}
|
||||
|
||||
func NewBackupReconciler(
|
||||
@@ -115,6 +116,7 @@ func NewBackupReconciler(
|
||||
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
|
||||
volumeSnapshotClient snapshotterClientSet.Interface,
|
||||
credentialStore credentials.FileStore,
|
||||
maxConcurrentK8SConnections int,
|
||||
) *backupReconciler {
|
||||
|
||||
b := &backupReconciler{
|
||||
@@ -140,6 +142,7 @@ func NewBackupReconciler(
|
||||
volumeSnapshotLister: volumeSnapshotLister,
|
||||
volumeSnapshotClient: volumeSnapshotClient,
|
||||
credentialFileStore: credentialStore,
|
||||
maxConcurrentK8SConnections: maxConcurrentK8SConnections,
|
||||
}
|
||||
b.updateTotalBackupMetric()
|
||||
return b
|
||||
@@ -684,7 +687,7 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
|
||||
|
||||
// Delete the VolumeSnapshots created in the backup, when CSI feature is enabled.
|
||||
if len(volumeSnapshots) > 0 && len(volumeSnapshotContents) > 0 {
|
||||
b.deleteVolumeSnapshot(volumeSnapshots, volumeSnapshotContents, backupLog)
|
||||
b.deleteVolumeSnapshots(volumeSnapshots, volumeSnapshotContents, backupLog, b.maxConcurrentK8SConnections)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -986,77 +989,100 @@ func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context,
|
||||
return result, err
|
||||
}
|
||||
|
||||
// deleteVolumeSnapshot delete VolumeSnapshot created during backup.
|
||||
// deleteVolumeSnapshots delete VolumeSnapshot created during backup.
|
||||
// This is used to avoid deleting namespace in cluster triggers the VolumeSnapshot deletion,
|
||||
// which will cause snapshot deletion on cloud provider, then backup cannot restore the PV.
|
||||
// If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to
|
||||
// change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete.
|
||||
func (b *backupReconciler) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.VolumeSnapshot,
|
||||
func (b *backupReconciler) deleteVolumeSnapshots(volumeSnapshots []snapshotv1api.VolumeSnapshot,
|
||||
volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
|
||||
logger logrus.FieldLogger) {
|
||||
logger logrus.FieldLogger, maxConcurrent int) {
|
||||
var wg sync.WaitGroup
|
||||
vscMap := make(map[string]snapshotv1api.VolumeSnapshotContent)
|
||||
for _, vsc := range volumeSnapshotContents {
|
||||
vscMap[vsc.Name] = vsc
|
||||
}
|
||||
|
||||
for _, vs := range volumeSnapshots {
|
||||
wg.Add(1)
|
||||
go func(vs snapshotv1api.VolumeSnapshot) {
|
||||
defer wg.Done()
|
||||
var vsc snapshotv1api.VolumeSnapshotContent
|
||||
modifyVSCFlag := false
|
||||
if vs.Status != nil &&
|
||||
vs.Status.BoundVolumeSnapshotContentName != nil &&
|
||||
len(*vs.Status.BoundVolumeSnapshotContentName) > 0 {
|
||||
var found bool
|
||||
if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found {
|
||||
logger.Errorf("Not find %s from the vscMap", *vs.Status.BoundVolumeSnapshotContentName)
|
||||
ch := make(chan snapshotv1api.VolumeSnapshot, maxConcurrent)
|
||||
defer func() {
|
||||
if _, ok := <-ch; ok {
|
||||
close(ch)
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(maxConcurrent)
|
||||
for i := 0; i < maxConcurrent; i++ {
|
||||
go func() {
|
||||
for {
|
||||
vs, ok := <-ch
|
||||
if !ok {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete {
|
||||
modifyVSCFlag = true
|
||||
}
|
||||
} else {
|
||||
logger.Errorf("VolumeSnapshot %s/%s is not ready. This is not expected.", vs.Namespace, vs.Name)
|
||||
b.deleteVolumeSnapshot(vs, vscMap, logger)
|
||||
}
|
||||
|
||||
// Change VolumeSnapshotContent's DeletionPolicy to Retain before deleting VolumeSnapshot,
|
||||
// because VolumeSnapshotContent will be deleted by deleting VolumeSnapshot, when
|
||||
// DeletionPolicy is set to Delete, but Velero needs VSC for cleaning snapshot on cloud
|
||||
// in backup deletion.
|
||||
if modifyVSCFlag {
|
||||
logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name)
|
||||
original := vsc.DeepCopy()
|
||||
vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
|
||||
err := kubeutil.PatchResource(original, &vsc, b.kbClient)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name)
|
||||
err := b.recreateVolumeSnapshotContent(vsc)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Delete VolumeSnapshot from cluster
|
||||
logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name)
|
||||
err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(b.ctx, vs.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
|
||||
}
|
||||
}(vs)
|
||||
}()
|
||||
}
|
||||
|
||||
for _, vs := range volumeSnapshots {
|
||||
ch <- vs
|
||||
}
|
||||
close(ch)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
|
||||
// instance.
|
||||
func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vscMap map[string]snapshotv1api.VolumeSnapshotContent, logger logrus.FieldLogger) {
|
||||
var vsc snapshotv1api.VolumeSnapshotContent
|
||||
modifyVSCFlag := false
|
||||
if vs.Status != nil &&
|
||||
vs.Status.BoundVolumeSnapshotContentName != nil &&
|
||||
len(*vs.Status.BoundVolumeSnapshotContentName) > 0 {
|
||||
var found bool
|
||||
if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found {
|
||||
logger.Errorf("Not find %s from the vscMap", *vs.Status.BoundVolumeSnapshotContentName)
|
||||
return
|
||||
}
|
||||
|
||||
if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete {
|
||||
modifyVSCFlag = true
|
||||
}
|
||||
} else {
|
||||
logger.Errorf("VolumeSnapshot %s/%s is not ready. This is not expected.", vs.Namespace, vs.Name)
|
||||
}
|
||||
|
||||
// Change VolumeSnapshotContent's DeletionPolicy to Retain before deleting VolumeSnapshot,
|
||||
// because VolumeSnapshotContent will be deleted by deleting VolumeSnapshot, when
|
||||
// DeletionPolicy is set to Delete, but Velero needs VSC for cleaning snapshot on cloud
|
||||
// in backup deletion.
|
||||
if modifyVSCFlag {
|
||||
logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name)
|
||||
original := vsc.DeepCopy()
|
||||
vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
|
||||
if err := b.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil {
|
||||
logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name)
|
||||
err := b.recreateVolumeSnapshotContent(vsc)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Delete VolumeSnapshot from cluster
|
||||
logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name)
|
||||
err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// recreateVolumeSnapshotContent will delete then re-create VolumeSnapshotContent,
|
||||
// because some parameter in VolumeSnapshotContent Spec is immutable, e.g. VolumeSnapshotRef
|
||||
// and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS.
|
||||
|
||||
@@ -1374,12 +1374,12 @@ func Test_getLastSuccessBySchedule(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteVolumeSnapshot(t *testing.T) {
|
||||
func TestDeleteVolumeSnapshots(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
vsArray []snapshotv1api.VolumeSnapshot
|
||||
vscArray []snapshotv1api.VolumeSnapshotContent
|
||||
expectedVSArray []*snapshotv1api.VolumeSnapshot
|
||||
expectedVSArray []snapshotv1api.VolumeSnapshot
|
||||
expectedVSCArray []snapshotv1api.VolumeSnapshotContent
|
||||
}{
|
||||
{
|
||||
@@ -1390,19 +1390,38 @@ func TestDeleteVolumeSnapshot(t *testing.T) {
|
||||
vscArray: []snapshotv1api.VolumeSnapshotContent{
|
||||
*builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(),
|
||||
},
|
||||
expectedVSArray: []*snapshotv1api.VolumeSnapshot{},
|
||||
expectedVSArray: []snapshotv1api.VolumeSnapshot{},
|
||||
expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{
|
||||
*builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "VS is ReadyToUse, and VS has corresponding VSC. Concurrent test.",
|
||||
vsArray: []snapshotv1api.VolumeSnapshot{
|
||||
*builder.ForVolumeSnapshot("velero", "vs1").ObjectMeta(builder.WithLabels("testing-vs", "vs1")).Status().BoundVolumeSnapshotContentName("vsc1").Result(),
|
||||
*builder.ForVolumeSnapshot("velero", "vs2").ObjectMeta(builder.WithLabels("testing-vs", "vs2")).Status().BoundVolumeSnapshotContentName("vsc2").Result(),
|
||||
*builder.ForVolumeSnapshot("velero", "vs3").ObjectMeta(builder.WithLabels("testing-vs", "vs3")).Status().BoundVolumeSnapshotContentName("vsc3").Result(),
|
||||
},
|
||||
vscArray: []snapshotv1api.VolumeSnapshotContent{
|
||||
*builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(),
|
||||
*builder.ForVolumeSnapshotContent("vsc2").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(),
|
||||
*builder.ForVolumeSnapshotContent("vsc3").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(),
|
||||
},
|
||||
expectedVSArray: []snapshotv1api.VolumeSnapshot{},
|
||||
expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{
|
||||
*builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(),
|
||||
*builder.ForVolumeSnapshotContent("vsc2").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(),
|
||||
*builder.ForVolumeSnapshotContent("vsc3").DeletionPolicy(snapshotv1api.VolumeSnapshotContentRetain).VolumeSnapshotRef("ns-", "name-").Status().Result(),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Corresponding VSC not found for VS. VS is not deleted.",
|
||||
vsArray: []snapshotv1api.VolumeSnapshot{
|
||||
*builder.ForVolumeSnapshot("velero", "vs1").ObjectMeta(builder.WithLabels("testing-vs", "vs1")).Status().BoundVolumeSnapshotContentName("vsc1").Result(),
|
||||
},
|
||||
vscArray: []snapshotv1api.VolumeSnapshotContent{},
|
||||
expectedVSArray: []*snapshotv1api.VolumeSnapshot{
|
||||
builder.ForVolumeSnapshot("velero", "vs1").Status().BoundVolumeSnapshotContentName("vsc1").Result(),
|
||||
expectedVSArray: []snapshotv1api.VolumeSnapshot{
|
||||
*builder.ForVolumeSnapshot("velero", "vs1").Status().BoundVolumeSnapshotContentName("vsc1").Result(),
|
||||
},
|
||||
expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{},
|
||||
},
|
||||
@@ -1414,7 +1433,7 @@ func TestDeleteVolumeSnapshot(t *testing.T) {
|
||||
vscArray: []snapshotv1api.VolumeSnapshotContent{
|
||||
*builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(),
|
||||
},
|
||||
expectedVSArray: []*snapshotv1api.VolumeSnapshot{},
|
||||
expectedVSArray: []snapshotv1api.VolumeSnapshot{},
|
||||
expectedVSCArray: []snapshotv1api.VolumeSnapshotContent{
|
||||
*builder.ForVolumeSnapshotContent("vsc1").DeletionPolicy(snapshotv1api.VolumeSnapshotContentDelete).Status().Result(),
|
||||
},
|
||||
@@ -1439,11 +1458,13 @@ func TestDeleteVolumeSnapshot(t *testing.T) {
|
||||
for _, vs := range tc.vsArray {
|
||||
_, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Create(context.Background(), &vs, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, sharedInformers.Snapshot().V1().VolumeSnapshots().Informer().GetStore().Add(&vs))
|
||||
}
|
||||
logger := logging.DefaultLogger(logrus.DebugLevel, logging.FormatText)
|
||||
|
||||
c.deleteVolumeSnapshot(tc.vsArray, tc.vscArray, logger)
|
||||
vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.Background(), metav1.ListOptions{})
|
||||
c.deleteVolumeSnapshots(tc.vsArray, tc.vscArray, logger, 30)
|
||||
|
||||
vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.TODO(), metav1.ListOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, len(tc.expectedVSArray), len(vsList.Items))
|
||||
for index := range tc.expectedVSArray {
|
||||
|
||||
Reference in New Issue
Block a user