refactor backup snapshot status updates into UpdateBackupSnapshotsStatus() and run in backup_finalizer_controller

Signed-off-by: Tiger Kaovilai <tkaovila@redhat.com>
This commit is contained in:
Tiger Kaovilai
2023-11-02 14:28:35 -04:00
parent 886e074b55
commit 9311a4269b
10 changed files with 198 additions and 77 deletions

View File

@@ -0,0 +1 @@
Update and wait for Backup.Status.CSIVolumeSnapshotsCompleted during finalize

View File

@@ -20,8 +20,6 @@ import (
"fmt"
"sort"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/vmware-tanzu/velero/internal/hook"
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@@ -51,7 +49,6 @@ type Request struct {
VolumeSnapshots []*volume.Snapshot
PodVolumeBackups []*velerov1api.PodVolumeBackup
BackedUpItems map[itemKey]struct{}
CSISnapshots []snapshotv1api.VolumeSnapshot
itemOperationsList *[]*itemoperation.BackupOperation
ResPolicies *resourcepolicies.Policies
SkippedPVTracker *skipPVTracker

64
pkg/backup/snapshots.go Normal file
View File

@@ -0,0 +1,64 @@
package backup
import (
"context"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)
// Common function to update the status of CSI snapshots
// returns VolumeSnapshot, VolumeSnapshotContent, VolumeSnapshotClasses referenced
func UpdateBackupCSISnapshotsStatus(client kbclient.Client, volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, backup *velerov1api.Backup, backupLog logrus.FieldLogger) ([]snapshotv1api.VolumeSnapshot, []snapshotv1api.VolumeSnapshotContent, []snapshotv1api.VolumeSnapshotClass) {
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}
if volumeSnapshotLister != nil {
tmpVSs, err := volumeSnapshotLister.List(label.NewSelectorForBackup(backup.Name))
if err != nil {
backupLog.Error(err)
}
for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}
}
err := client.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}
vsClassSet := sets.NewString()
for index := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if volumeSnapshotContents[index].Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := client.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}
}
}
backup.Status.CSIVolumeSnapshotsAttempted = len(volumeSnapshots)
for _, vs := range volumeSnapshots {
if vs.Status != nil && boolptr.IsSetToTrue(vs.Status.ReadyToUse) {
backup.Status.CSIVolumeSnapshotsCompleted++
}
}
return volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses
}

View File

@@ -768,7 +768,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupStoreGetter,
s.config.formatFlag.Parse(),
s.csiSnapshotLister,
s.csiSnapshotClient,
s.credentialFileStore,
s.config.maxConcurrentK8SConnections,
s.config.defaultSnapshotMoveData,
@@ -832,6 +831,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
cmd.CheckError(err)
r := controller.NewBackupFinalizerReconciler(
s.mgr.GetClient(),
s.csiSnapshotLister,
clock.RealClock{},
backupper,
newPluginManager,

View File

@@ -34,7 +34,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
@@ -112,7 +111,6 @@ func NewBackupReconciler(
backupStoreGetter persistence.ObjectBackupStoreGetter,
formatFlag logging.Format,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
volumeSnapshotClient snapshotterClientSet.Interface,
credentialStore credentials.FileStore,
maxConcurrentK8SConnections int,
defaultSnapshotMoveData bool,
@@ -138,7 +136,6 @@ func NewBackupReconciler(
backupStoreGetter: backupStoreGetter,
formatFlag: formatFlag,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotClient: volumeSnapshotClient,
credentialFileStore: credentialStore,
maxConcurrentK8SConnections: maxConcurrentK8SConnections,
defaultSnapshotMoveData: defaultSnapshotMoveData,
@@ -660,65 +657,15 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
fatalErrs = append(fatalErrs, err)
}
// Empty slices here so that they can be passed in to the persistBackup call later, regardless of whether or not CSI's enabled.
// This way, we only make the Lister call if the feature flag's on.
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) {
backupLog.Info("backup SnapshotMoveData is set to true, skip VolumeSnapshot resource persistence.")
} else if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}
if b.volumeSnapshotLister != nil {
tmpVSs, err := b.volumeSnapshotLister.List(label.NewSelectorForBackup(backup.Name))
if err != nil {
backupLog.Error(err)
}
for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}
}
backup.CSISnapshots = volumeSnapshots
err = b.kbClient.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}
vsClassSet := sets.NewString()
for index := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if volumeSnapshotContents[index].Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := b.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}
}
}
}
// native snapshots phase will either be failed or completed right away
// https://github.com/vmware-tanzu/velero/blob/de3ea52f0cc478e99efa7b9524c7f353514261a4/pkg/backup/item_backupper.go#L632-L639
backup.Status.VolumeSnapshotsAttempted = len(backup.VolumeSnapshots)
for _, snap := range backup.VolumeSnapshots {
if snap.Status.Phase == volume.SnapshotPhaseCompleted {
backup.Status.VolumeSnapshotsCompleted++
}
}
backup.Status.CSIVolumeSnapshotsAttempted = len(backup.CSISnapshots)
for _, vs := range backup.CSISnapshots {
if vs.Status != nil && boolptr.IsSetToTrue(vs.Status.ReadyToUse) {
backup.Status.CSIVolumeSnapshotsCompleted++
}
}
volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses := pkgbackup.UpdateBackupCSISnapshotsStatus(b.kbClient, b.volumeSnapshotLister, backup.Backup, backupLog)
// Iterate over backup item operations and update progress.
// Any errors on operations at this point should be added to backup errors.

View File

@@ -43,7 +43,6 @@ import (
"k8s.io/utils/clock"
testclocks "k8s.io/utils/clock/testing"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
@@ -1059,7 +1058,7 @@ func TestProcessBackupCompletions(t *testing.T) {
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
@@ -1181,7 +1180,7 @@ func TestProcessBackupCompletions(t *testing.T) {
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
@@ -1263,7 +1262,7 @@ func TestProcessBackupCompletions(t *testing.T) {
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
@@ -1710,7 +1709,7 @@ func TestPatchResourceWorksWithStatus(t *testing.T) {
},
}
// check original exists
if err := fakeClient.Get(context.Background(), client.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
if err := fakeClient.Get(context.Background(), kbclient.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
t.Errorf("PatchResource() error = %v", err)
}
// ignore resourceVersion
@@ -1720,7 +1719,7 @@ func TestPatchResourceWorksWithStatus(t *testing.T) {
t.Errorf("PatchResource() error = %v, wantErr %v", err, tt.wantErr)
}
// check updated exists
if err := fakeClient.Get(context.Background(), client.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
if err := fakeClient.Get(context.Background(), kbclient.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
t.Errorf("PatchResource() error = %v", err)
}

View File

@@ -25,10 +25,13 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
"github.com/vmware-tanzu/velero/pkg/metrics"
@@ -40,19 +43,21 @@ import (
// backupFinalizerReconciler reconciles a Backup object
type backupFinalizerReconciler struct {
client kbclient.Client
clock clocks.WithTickerAndDelayedExecution
backupper pkgbackup.Backupper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
log logrus.FieldLogger
client kbclient.Client
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
clock clocks.WithTickerAndDelayedExecution
backupper pkgbackup.Backupper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
log logrus.FieldLogger
}
// NewBackupFinalizerReconciler initializes and returns backupFinalizerReconciler struct.
func NewBackupFinalizerReconciler(
client kbclient.Client,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
clock clocks.WithTickerAndDelayedExecution,
backupper pkgbackup.Backupper,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
@@ -186,7 +191,16 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
backup.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
recordBackupMetrics(log, backup, outBackupFile, r.metrics, true)
CSISnapshotsNotReady := errors.New("CSI snapshots not ready")
retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err == CSISnapshotsNotReady
}, func() error {
pkgbackup.UpdateBackupCSISnapshotsStatus(r.client, r.volumeSnapshotLister, backup, log)
if backup.Status.CSIVolumeSnapshotsCompleted < backup.Status.CSIVolumeSnapshotsAttempted {
return CSISnapshotsNotReady
}
return nil
})
// update backup metadata in object store
backupJSON := new(bytes.Buffer)
if err := encode.To(backup, "json", backupJSON); err != nil {

View File

@@ -23,6 +23,7 @@ import (
"testing"
"time"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@@ -43,12 +44,14 @@ import (
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
velerotestmocks "github.com/vmware-tanzu/velero/pkg/test/mocks"
)
func mockBackupFinalizerReconciler(fakeClient kbclient.Client, fakeClock *testclocks.FakeClock) (*backupFinalizerReconciler, *fakeBackupper) {
func mockBackupFinalizerReconciler(fakeClient kbclient.Client, fakeVolumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, fakeClock *testclocks.FakeClock) (*backupFinalizerReconciler, *fakeBackupper) {
backupper := new(fakeBackupper)
return NewBackupFinalizerReconciler(
fakeClient,
fakeVolumeSnapshotLister,
fakeClock,
backupper,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
@@ -160,7 +163,10 @@ func TestBackupFinalizerReconcile(t *testing.T) {
}
fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...)
reconciler, backupper := mockBackupFinalizerReconciler(fakeClient, fakeClock)
fakeVolumeSnapshotLister := velerotestmocks.NewVolumeSnapshotLister(t)
reconciler, backupper := mockBackupFinalizerReconciler(fakeClient, fakeVolumeSnapshotLister, fakeClock)
pluginManager.On("CleanupClients").Return(nil)
backupStore.On("GetBackupItemOperations", test.backup.Name).Return(test.backupOperations, nil)
backupStore.On("GetBackupContents", mock.Anything).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil)

20
pkg/test/mocks.go Normal file
View File

@@ -0,0 +1,20 @@
package test
import (
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"k8s.io/apimachinery/pkg/labels"
)
// VolumeSnapshotLister helps list VolumeSnapshots.
// All objects returned here must be treated as read-only.
//
//go:generate mockery --name VolumeSnapshotLister
type VolumeSnapshotLister interface {
// List lists all VolumeSnapshots in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*snapshotv1.VolumeSnapshot, err error)
// VolumeSnapshots returns an object that can list and get VolumeSnapshots.
VolumeSnapshots(namespace string) snapshotv1listers.VolumeSnapshotNamespaceLister
snapshotv1listers.VolumeSnapshotListerExpansion
}

View File

@@ -0,0 +1,73 @@
// Code generated by mockery v2.35.4. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
labels "k8s.io/apimachinery/pkg/labels"
v1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
)
// VolumeSnapshotLister is an autogenerated mock type for the VolumeSnapshotLister type
type VolumeSnapshotLister struct {
mock.Mock
}
// List provides a mock function with given fields: selector
func (_m *VolumeSnapshotLister) List(selector labels.Selector) ([]*v1.VolumeSnapshot, error) {
ret := _m.Called(selector)
var r0 []*v1.VolumeSnapshot
var r1 error
if rf, ok := ret.Get(0).(func(labels.Selector) ([]*v1.VolumeSnapshot, error)); ok {
return rf(selector)
}
if rf, ok := ret.Get(0).(func(labels.Selector) []*v1.VolumeSnapshot); ok {
r0 = rf(selector)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*v1.VolumeSnapshot)
}
}
if rf, ok := ret.Get(1).(func(labels.Selector) error); ok {
r1 = rf(selector)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// VolumeSnapshots provides a mock function with given fields: namespace
func (_m *VolumeSnapshotLister) VolumeSnapshots(namespace string) volumesnapshotv1.VolumeSnapshotNamespaceLister {
ret := _m.Called(namespace)
var r0 volumesnapshotv1.VolumeSnapshotNamespaceLister
if rf, ok := ret.Get(0).(func(string) volumesnapshotv1.VolumeSnapshotNamespaceLister); ok {
r0 = rf(namespace)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(volumesnapshotv1.VolumeSnapshotNamespaceLister)
}
}
return r0
}
// NewVolumeSnapshotLister creates a new instance of VolumeSnapshotLister. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewVolumeSnapshotLister(t interface {
mock.TestingT
Cleanup(func())
}) *VolumeSnapshotLister {
mock := &VolumeSnapshotLister{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}