Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue

Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue

Fixes #8587

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
This commit is contained in:
Wenkai Yin(尹文开)
2025-01-09 14:42:34 +08:00
parent 225db5e8c0
commit 1f39943291
6 changed files with 200 additions and 42 deletions

View File

@@ -0,0 +1 @@
Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue

View File

@@ -35,10 +35,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -315,7 +313,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
var podVolumeBackupper podvolume.Backupper
if kb.podVolumeBackupperFactory != nil {
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType)
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType)
if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err)
@@ -745,6 +743,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()
// the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
@@ -758,36 +757,19 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
}
}
// wait all PVBs of the item block pods to be processed
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}
podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}
pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue
for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
pvbs, err := itemBlock.itemBackupper.podVolumeBackupper.ListPodVolumeBackupsByPod(namespace, name)
if err != nil {
return errors.Wrapf(err, "failed to list PodVolumeBackups for pod %s/%s", namespace, name)
}
processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
for _, pvb := range pvbs {
pvbMap[pvb] = pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed
}
pvbMap[&pvbList.Items[i]] = processed
}
checkFunc := func(context.Context) (done bool, err error) {
@@ -796,8 +778,8 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
if processed {
continue
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name)
if err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue

View File

@@ -3945,7 +3945,7 @@ func TestBackupWithHooks(t *testing.T) {
type fakePodVolumeBackupperFactory struct{}
func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1.Backup, string) (podvolume.Backupper, error) {
func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, logrus.FieldLogger, *velerov1.Backup, string) (podvolume.Backupper, error) {
return &fakePodVolumeBackupper{}, nil
}
@@ -3978,6 +3978,24 @@ func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogg
return b.pvbs
}
func (b *fakePodVolumeBackupper) GetPodVolumeBackup(namespace, name string) (*velerov1.PodVolumeBackup, error) {
for _, pvb := range b.pvbs {
if pvb.Namespace == namespace && pvb.Name == name {
return pvb, nil
}
}
return nil, nil
}
func (b *fakePodVolumeBackupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1.PodVolumeBackup, error) {
var pvbs []*velerov1.PodVolumeBackup
for _, pvb := range b.pvbs {
if pvb.Spec.Pod.Namespace == podNamespace && pvb.Spec.Pod.Name == podName {
pvbs = append(pvbs, pvb)
}
}
return pvbs, nil
}
// TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup,
// and ensures that the pod volume backupper is called, that the returned PodVolumeBackups
// are added to the Request object, and that when PVCs are backed up with PodVolume, the

View File

@@ -48,6 +48,8 @@ type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error)
ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error)
}
type backupper struct {
@@ -59,7 +61,10 @@ type backupper struct {
pvbInformer ctrlcache.Informer
handlerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
result []*velerov1api.PodVolumeBackup
// pvbIndexer holds all PVBs created by this backuper and is capable to search
// the PVBs based on specific properties quickly because of the embedded indexes.
// The statuses of the PVBs are got updated when Informer receives update events.
pvbIndexer cache.Indexer
}
type skippedPVC struct {
@@ -101,8 +106,22 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) {
}
}
const indexNamePod = "POD"
func podIndexFunc(obj interface{}) ([]string, error) {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
if pvb == nil {
return nil, errors.New("PodVolumeBackup is nil")
}
return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil
}
func newBackupper(
ctx context.Context,
log logrus.FieldLogger,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
pvbInformer ctrlcache.Informer,
@@ -118,13 +137,19 @@ func newBackupper(
uploaderType: uploaderType,
pvbInformer: pvbInformer,
wg: sync.WaitGroup{},
result: []*velerov1api.PodVolumeBackup{},
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}
b.handlerRegistration, _ = pvbInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
pvb := obj.(*velerov1api.PodVolumeBackup)
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
log.Errorf("expected PodVolumeBackup, but got %T", obj)
return
}
if pvb.GetLabels()[velerov1api.BackupUIDLabel] != string(backup.UID) {
return
@@ -135,7 +160,10 @@ func newBackupper(
return
}
b.result = append(b.result, pvb)
// the Indexer inserts PVB directly if the PVB to be updated doesn't exist
if err := b.pvbIndexer.Update(pvb); err != nil {
log.WithError(err).Errorf("failed to update PVB %s/%s in indexer", pvb.Namespace, pvb.Name)
}
b.wg.Done()
},
},
@@ -318,6 +346,12 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
}
b.wg.Add(1)
if err := b.pvbIndexer.Add(volumeBackup); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to add PodVolumeBackup %s/%s to indexer", volumeBackup.Namespace, volumeBackup.Name))
continue
}
podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName)
}
@@ -343,7 +377,12 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
case <-b.ctx.Done():
log.Error("timed out waiting for all PodVolumeBackups to complete")
case <-done:
for _, pvb := range b.result {
for _, obj := range b.pvbIndexer.List() {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
log.Errorf("expected PodVolumeBackup, but got %T", obj)
continue
}
podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
@@ -353,6 +392,37 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
return podVolumeBackups
}
func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) {
obj, exist, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String())
if err != nil {
return nil, err
}
if !exist {
return nil, nil
}
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
return pvb, nil
}
func (b *backupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) {
objs, err := b.pvbIndexer.ByIndex(indexNamePod, cache.NewObjectName(podNamespace, podName).String())
if err != nil {
return nil, err
}
var pvbs []*velerov1api.PodVolumeBackup
for _, obj := range objs {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
pvbs = append(pvbs, pvb)
}
return pvbs, nil
}
func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {
for _, volumeName := range volumesToBackup {
log.WithError(err).Warnf("Skip pod volume %s", volumeName)

View File

@@ -32,7 +32,7 @@ import (
// BackupperFactory can construct pod volumes backuppers.
type BackupperFactory interface {
// NewBackupper returns a pod volumes backupper for use during a single Velero backup.
NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error)
NewBackupper(context.Context, logrus.FieldLogger, *velerov1api.Backup, string) (Backupper, error)
}
func NewBackupperFactory(
@@ -59,8 +59,8 @@ type backupperFactory struct {
log logrus.FieldLogger
}
func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
func (bf *backupperFactory) NewBackupper(ctx context.Context, log logrus.FieldLogger, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, log, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) {
return nil, errors.New("timed out waiting for caches to sync")

View File

@@ -315,6 +315,7 @@ func TestBackupPodVolumes(t *testing.T) {
scheme := runtime.NewScheme()
velerov1api.AddToScheme(scheme)
corev1api.AddToScheme(scheme)
log := logrus.New()
tests := []struct {
name string
@@ -594,7 +595,7 @@ func TestBackupPodVolumes(t *testing.T) {
backupObj.Spec.StorageLocation = test.bsl
factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, fakeCtrlClient, pvbInformer, velerotest.NewLogger())
bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType)
bp, err := factory.NewBackupper(ctx, log, backupObj, test.uploaderType)
require.NoError(t, err)
@@ -619,6 +620,91 @@ func TestBackupPodVolumes(t *testing.T) {
}
}
func TestGetPodVolumeBackup(t *testing.T) {
backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}
obj := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
err := backupper.pvbIndexer.Add(obj)
require.NoError(t, err)
// not exist PVB
pvb, err := backupper.GetPodVolumeBackup("invalid-namespace", "invalid-name")
require.NoError(t, err)
assert.Nil(t, pvb)
// exist PVB
pvb, err = backupper.GetPodVolumeBackup("velero", "pvb")
require.NoError(t, err)
assert.NotNil(t, pvb)
}
func TestListPodVolumeBackupsByPodp(t *testing.T) {
backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}
obj1 := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb1",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
obj2 := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb2",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
err := backupper.pvbIndexer.Add(obj1)
require.NoError(t, err)
err = backupper.pvbIndexer.Add(obj2)
require.NoError(t, err)
// not exist PVBs
pvbs, err := backupper.ListPodVolumeBackupsByPod("invalid-namespace", "invalid-name")
require.NoError(t, err)
assert.Empty(t, pvbs)
// exist PVBs
pvbs, err = backupper.ListPodVolumeBackupsByPod("default", "pod")
require.NoError(t, err)
assert.Len(t, pvbs, 2)
}
type logHook struct {
entry *logrus.Entry
}
@@ -636,6 +722,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) {
defer func() {
cancelFunc()
}()
log := logrus.New()
cases := []struct {
name string
ctx context.Context
@@ -691,7 +778,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) {
logHook := &logHook{}
logger.Hooks.Add(logHook)
backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper := newBackupper(c.ctx, log, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper.wg.Add(1)
if c.statusToBeUpdated != nil {