From 1f399432910ac9549f953d8200f886a9bded8ab3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Thu, 9 Jan 2025 14:42:34 +0800 Subject: [PATCH] Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue Fixes #8587 Signed-off-by: Wenkai Yin(尹文开) --- changelogs/unreleased/8603-ywk253100 | 1 + pkg/backup/backup.go | 44 ++++---------- pkg/backup/backup_test.go | 20 +++++- pkg/podvolume/backupper.go | 80 ++++++++++++++++++++++-- pkg/podvolume/backupper_factory.go | 6 +- pkg/podvolume/backupper_test.go | 91 +++++++++++++++++++++++++++- 6 files changed, 200 insertions(+), 42 deletions(-) create mode 100644 changelogs/unreleased/8603-ywk253100 diff --git a/changelogs/unreleased/8603-ywk253100 b/changelogs/unreleased/8603-ywk253100 new file mode 100644 index 000000000..7c7510e82 --- /dev/null +++ b/changelogs/unreleased/8603-ywk253100 @@ -0,0 +1 @@ +Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue \ No newline at end of file diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index d30542887..280164cc4 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -35,10 +35,8 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/selection" kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" kbclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -315,7 +313,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( var podVolumeBackupper podvolume.Backupper if kb.podVolumeBackupperFactory != nil { - podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType) + podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType) if err != nil { log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper") return errors.WithStack(err) @@ -745,6 +743,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite log := itemBlock.Log defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() + // the post hooks will not execute until all PVBs of the item block pods are processed if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil { log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock") return @@ -758,36 +757,19 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite } } +// wait all PVBs of the item block pods to be processed func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error { - requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)}) - if err != nil { - return errors.Wrapf(err, "failed to create label requirement") - } - options := &kbclient.ListOptions{ - LabelSelector: labels.NewSelector().Add(*requirement), - } - pvbList := &velerov1api.PodVolumeBackupList{} - if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil { - return errors.Wrap(err, "failed to list PVBs") - } - - podMap := map[string]struct{}{} - for _, pod := range pods { - podMap[string(pod.Item.GetUID())] = struct{}{} - } - pvbMap := map[*velerov1api.PodVolumeBackup]bool{} - for i, pvb := range pvbList.Items { - if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist { - continue + for _, pod := range pods { + namespace, name := pod.Item.GetNamespace(), pod.Item.GetName() + pvbs, err := itemBlock.itemBackupper.podVolumeBackupper.ListPodVolumeBackupsByPod(namespace, name) + if err != nil { + return errors.Wrapf(err, "failed to list PodVolumeBackups for pod %s/%s", namespace, name) } - - processed := false - if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || - pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { - processed = true + for _, pvb := range pvbs { + pvbMap[pvb] = pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed } - pvbMap[&pvbList.Items[i]] = processed } checkFunc := func(context.Context) (done bool, err error) { @@ -796,8 +778,8 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l if processed { continue } - updatedPVB := &velerov1api.PodVolumeBackup{} - if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil { + updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name) + if err != nil { allProcessed = false log.Infof("failed to get PVB: %v", err) continue diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index eb25d65f0..425fa827c 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -3945,7 +3945,7 @@ func TestBackupWithHooks(t *testing.T) { type fakePodVolumeBackupperFactory struct{} -func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1.Backup, string) (podvolume.Backupper, error) { +func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, logrus.FieldLogger, *velerov1.Backup, string) (podvolume.Backupper, error) { return &fakePodVolumeBackupper{}, nil } @@ -3978,6 +3978,24 @@ func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogg return b.pvbs } +func (b *fakePodVolumeBackupper) GetPodVolumeBackup(namespace, name string) (*velerov1.PodVolumeBackup, error) { + for _, pvb := range b.pvbs { + if pvb.Namespace == namespace && pvb.Name == name { + return pvb, nil + } + } + return nil, nil +} +func (b *fakePodVolumeBackupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1.PodVolumeBackup, error) { + var pvbs []*velerov1.PodVolumeBackup + for _, pvb := range b.pvbs { + if pvb.Spec.Pod.Namespace == podNamespace && pvb.Spec.Pod.Name == podName { + pvbs = append(pvbs, pvb) + } + } + return pvbs, nil +} + // TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup, // and ensures that the pod volume backupper is called, that the returned PodVolumeBackups // are added to the Request object, and that when PVCs are backed up with PodVolume, the diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go index 29452344e..09ad91db8 100644 --- a/pkg/podvolume/backupper.go +++ b/pkg/podvolume/backupper.go @@ -48,6 +48,8 @@ type Backupper interface { // BackupPodVolumes backs up all specified volumes in a pod. BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup + GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) + ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) } type backupper struct { @@ -59,7 +61,10 @@ type backupper struct { pvbInformer ctrlcache.Informer handlerRegistration cache.ResourceEventHandlerRegistration wg sync.WaitGroup - result []*velerov1api.PodVolumeBackup + // pvbIndexer holds all PVBs created by this backuper and is capable to search + // the PVBs based on specific properties quickly because of the embedded indexes. + // The statuses of the PVBs are got updated when Informer receives update events. + pvbIndexer cache.Indexer } type skippedPVC struct { @@ -101,8 +106,22 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) { } } +const indexNamePod = "POD" + +func podIndexFunc(obj interface{}) ([]string, error) { + pvb, ok := obj.(*velerov1api.PodVolumeBackup) + if !ok { + return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj) + } + if pvb == nil { + return nil, errors.New("PodVolumeBackup is nil") + } + return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil +} + func newBackupper( ctx context.Context, + log logrus.FieldLogger, repoLocker *repository.RepoLocker, repoEnsurer *repository.Ensurer, pvbInformer ctrlcache.Informer, @@ -118,13 +137,19 @@ func newBackupper( uploaderType: uploaderType, pvbInformer: pvbInformer, wg: sync.WaitGroup{}, - result: []*velerov1api.PodVolumeBackup{}, + pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + indexNamePod: podIndexFunc, + }), } b.handlerRegistration, _ = pvbInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { - pvb := obj.(*velerov1api.PodVolumeBackup) + pvb, ok := obj.(*velerov1api.PodVolumeBackup) + if !ok { + log.Errorf("expected PodVolumeBackup, but got %T", obj) + return + } if pvb.GetLabels()[velerov1api.BackupUIDLabel] != string(backup.UID) { return @@ -135,7 +160,10 @@ func newBackupper( return } - b.result = append(b.result, pvb) + // the Indexer inserts PVB directly if the PVB to be updated doesn't exist + if err := b.pvbIndexer.Update(pvb); err != nil { + log.WithError(err).Errorf("failed to update PVB %s/%s in indexer", pvb.Namespace, pvb.Name) + } b.wg.Done() }, }, @@ -318,6 +346,12 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. continue } b.wg.Add(1) + + if err := b.pvbIndexer.Add(volumeBackup); err != nil { + errs = append(errs, errors.Wrapf(err, "failed to add PodVolumeBackup %s/%s to indexer", volumeBackup.Namespace, volumeBackup.Name)) + continue + } + podVolumeBackups = append(podVolumeBackups, volumeBackup) pvcSummary.addBackedup(volumeName) } @@ -343,7 +377,12 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero case <-b.ctx.Done(): log.Error("timed out waiting for all PodVolumeBackups to complete") case <-done: - for _, pvb := range b.result { + for _, obj := range b.pvbIndexer.List() { + pvb, ok := obj.(*velerov1api.PodVolumeBackup) + if !ok { + log.Errorf("expected PodVolumeBackup, but got %T", obj) + continue + } podVolumeBackups = append(podVolumeBackups, pvb) if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { log.Errorf("pod volume backup failed: %s", pvb.Status.Message) @@ -353,6 +392,37 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero return podVolumeBackups } +func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) { + obj, exist, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String()) + if err != nil { + return nil, err + } + if !exist { + return nil, nil + } + pvb, ok := obj.(*velerov1api.PodVolumeBackup) + if !ok { + return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj) + } + return pvb, nil +} + +func (b *backupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) { + objs, err := b.pvbIndexer.ByIndex(indexNamePod, cache.NewObjectName(podNamespace, podName).String()) + if err != nil { + return nil, err + } + var pvbs []*velerov1api.PodVolumeBackup + for _, obj := range objs { + pvb, ok := obj.(*velerov1api.PodVolumeBackup) + if !ok { + return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj) + } + pvbs = append(pvbs, pvb) + } + return pvbs, nil +} + func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) { for _, volumeName := range volumesToBackup { log.WithError(err).Warnf("Skip pod volume %s", volumeName) diff --git a/pkg/podvolume/backupper_factory.go b/pkg/podvolume/backupper_factory.go index fb166f110..f75f1d30b 100644 --- a/pkg/podvolume/backupper_factory.go +++ b/pkg/podvolume/backupper_factory.go @@ -32,7 +32,7 @@ import ( // BackupperFactory can construct pod volumes backuppers. type BackupperFactory interface { // NewBackupper returns a pod volumes backupper for use during a single Velero backup. - NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error) + NewBackupper(context.Context, logrus.FieldLogger, *velerov1api.Backup, string) (Backupper, error) } func NewBackupperFactory( @@ -59,8 +59,8 @@ type backupperFactory struct { log logrus.FieldLogger } -func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) { - b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup) +func (bf *backupperFactory) NewBackupper(ctx context.Context, log logrus.FieldLogger, backup *velerov1api.Backup, uploaderType string) (Backupper, error) { + b := newBackupper(ctx, log, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup) if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) { return nil, errors.New("timed out waiting for caches to sync") diff --git a/pkg/podvolume/backupper_test.go b/pkg/podvolume/backupper_test.go index 941436830..1456bdf11 100644 --- a/pkg/podvolume/backupper_test.go +++ b/pkg/podvolume/backupper_test.go @@ -315,6 +315,7 @@ func TestBackupPodVolumes(t *testing.T) { scheme := runtime.NewScheme() velerov1api.AddToScheme(scheme) corev1api.AddToScheme(scheme) + log := logrus.New() tests := []struct { name string @@ -594,7 +595,7 @@ func TestBackupPodVolumes(t *testing.T) { backupObj.Spec.StorageLocation = test.bsl factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, fakeCtrlClient, pvbInformer, velerotest.NewLogger()) - bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType) + bp, err := factory.NewBackupper(ctx, log, backupObj, test.uploaderType) require.NoError(t, err) @@ -619,6 +620,91 @@ func TestBackupPodVolumes(t *testing.T) { } } +func TestGetPodVolumeBackup(t *testing.T) { + backupper := &backupper{ + pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + indexNamePod: podIndexFunc, + }), + } + + obj := &velerov1api.PodVolumeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pvb", + }, + Spec: velerov1api.PodVolumeBackupSpec{ + Pod: corev1api.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "pod", + }, + }, + } + + err := backupper.pvbIndexer.Add(obj) + require.NoError(t, err) + + // not exist PVB + pvb, err := backupper.GetPodVolumeBackup("invalid-namespace", "invalid-name") + require.NoError(t, err) + assert.Nil(t, pvb) + + // exist PVB + pvb, err = backupper.GetPodVolumeBackup("velero", "pvb") + require.NoError(t, err) + assert.NotNil(t, pvb) +} + +func TestListPodVolumeBackupsByPodp(t *testing.T) { + backupper := &backupper{ + pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + indexNamePod: podIndexFunc, + }), + } + + obj1 := &velerov1api.PodVolumeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pvb1", + }, + Spec: velerov1api.PodVolumeBackupSpec{ + Pod: corev1api.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "pod", + }, + }, + } + obj2 := &velerov1api.PodVolumeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pvb2", + }, + Spec: velerov1api.PodVolumeBackupSpec{ + Pod: corev1api.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "pod", + }, + }, + } + + err := backupper.pvbIndexer.Add(obj1) + require.NoError(t, err) + err = backupper.pvbIndexer.Add(obj2) + require.NoError(t, err) + + // not exist PVBs + pvbs, err := backupper.ListPodVolumeBackupsByPod("invalid-namespace", "invalid-name") + require.NoError(t, err) + assert.Empty(t, pvbs) + + // exist PVBs + pvbs, err = backupper.ListPodVolumeBackupsByPod("default", "pod") + require.NoError(t, err) + assert.Len(t, pvbs, 2) +} + type logHook struct { entry *logrus.Entry } @@ -636,6 +722,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) { defer func() { cancelFunc() }() + log := logrus.New() cases := []struct { name string ctx context.Context @@ -691,7 +778,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) { logHook := &logHook{} logger.Hooks.Add(logHook) - backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{}) + backuper := newBackupper(c.ctx, log, nil, nil, informer, nil, "", &velerov1api.Backup{}) backuper.wg.Add(1) if c.statusToBeUpdated != nil {