Track the skipped PV in a backup and print the summary in backup log (#6496)

Partially address the requirements in #5834

Signed-off-by: Daniel Jiang <jiangd@vmware.com>
This commit is contained in:
Daniel Jiang
2023-07-20 16:13:48 +08:00
committed by GitHub
parent b4181ef803
commit 2548b20db9
16 changed files with 816 additions and 127 deletions

View File

@@ -0,0 +1 @@
Track the skipped PVC and print the summary in backup log

View File

@@ -430,8 +430,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
if err := kube.PatchResource(backupRequest.Backup, updated, kb.kbClient); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
skippedPVSummary, _ := json.Marshal(backupRequest.SkippedPVTracker.Summary())
log.Infof("Summary for skipped PVs: %s", skippedPVSummary)
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
return nil
@@ -590,12 +591,13 @@ func (kb *kubernetesBackupper) FinalizeBackup(log logrus.FieldLogger,
log.WithField("progress", "").Infof("Collected %d items from the async BIA operations PostOperationItems list", len(items))
itemBackupper := &itemBackupper{
backupRequest: backupRequest,
tarWriter: tw,
dynamicFactory: kb.dynamicFactory,
kbClient: kb.kbClient,
discoveryHelper: kb.discoveryHelper,
itemHookHandler: &hook.NoOpItemHookHandler{},
backupRequest: backupRequest,
tarWriter: tw,
dynamicFactory: kb.dynamicFactory,
kbClient: kb.kbClient,
discoveryHelper: kb.discoveryHelper,
itemHookHandler: &hook.NoOpItemHookHandler{},
podVolumeSnapshotTracker: newPVCSnapshotTracker(),
}
updateFiles := make(map[string]FileForArchive)
backedUpGroupResources := map[schema.GroupResource]bool{}

View File

@@ -26,6 +26,7 @@ import (
"io"
"sort"
"strings"
"sync"
"testing"
"time"
@@ -66,7 +67,10 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
}
h := newHarness(t)
req := &Request{Backup: defaultBackup().Result()}
req := &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile := bytes.NewBuffer([]byte{})
apiResources := []*test.APIResource{
@@ -121,7 +125,10 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) {
// the request's BackedUpItems field.
func TestBackupProgressIsUpdated(t *testing.T) {
h := newHarness(t)
req := &Request{Backup: defaultBackup().Result()}
req := &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile := bytes.NewBuffer([]byte{})
apiResources := []*test.APIResource{
@@ -853,8 +860,11 @@ func TestBackupOldResourceFiltering(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1027,8 +1037,11 @@ func TestCRDInclusion(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1119,8 +1132,11 @@ func TestBackupResourceCohabitation(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1144,7 +1160,8 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
// run and verify backup 1
backup1 := &Request{
Backup: defaultBackup().Result(),
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
}
backup1File := bytes.NewBuffer([]byte{})
@@ -1157,7 +1174,8 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
// run and verify backup 2
backup2 := &Request{
Backup: defaultBackup().Result(),
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
}
backup2File := bytes.NewBuffer([]byte{})
@@ -1204,8 +1222,11 @@ func TestBackupResourceOrdering(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1224,12 +1245,15 @@ func TestBackupResourceOrdering(t *testing.T) {
// to run for specific resources/namespaces and simply records the items
// that it is executed for.
type recordResourcesAction struct {
name string
selector velero.ResourceSelector
ids []string
backups []velerov1.Backup
executionErr error
additionalItems []velero.ResourceIdentifier
operationID string
postOperationItems []velero.ResourceIdentifier
skippedCSISnapshot bool
}
func (a *recordResourcesAction) Execute(item runtime.Unstructured, backup *velerov1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, []velero.ResourceIdentifier, error) {
@@ -1239,8 +1263,13 @@ func (a *recordResourcesAction) Execute(item runtime.Unstructured, backup *veler
}
a.ids = append(a.ids, kubeutil.NamespaceAndName(metadata))
a.backups = append(a.backups, *backup)
return item, a.additionalItems, a.operationID, a.postOperationItems, nil
if a.skippedCSISnapshot {
u := &unstructured.Unstructured{Object: item.UnstructuredContent()}
u.SetAnnotations(map[string]string{skippedNoCSIPVAnnotation: "true"})
item = u
a.additionalItems = nil
}
return item, a.additionalItems, a.operationID, a.postOperationItems, a.executionErr
}
func (a *recordResourcesAction) AppliesTo() (velero.ResourceSelector, error) {
@@ -1256,7 +1285,7 @@ func (a *recordResourcesAction) Cancel(operationID string, backup *velerov1.Back
}
func (a *recordResourcesAction) Name() string {
return ""
return a.name
}
func (a *recordResourcesAction) ForResource(resource string) *recordResourcesAction {
@@ -1279,6 +1308,113 @@ func (a *recordResourcesAction) WithAdditionalItems(items []velero.ResourceIdent
return a
}
func (a *recordResourcesAction) WithName(name string) *recordResourcesAction {
a.name = name
return a
}
func (a *recordResourcesAction) WithExecutionErr(executionErr error) *recordResourcesAction {
a.executionErr = executionErr
return a
}
func (a *recordResourcesAction) WithSkippedCSISnapshotFlag(flag bool) *recordResourcesAction {
a.skippedCSISnapshot = flag
return a
}
// TestBackupItemActionsForSkippedPV runs backups with backup item actions, and
// verifies that the data in SkippedPVTracker is updated as expected.
func TestBackupItemActionsForSkippedPV(t *testing.T) {
tests := []struct {
name string
backupReq *Request
apiResources []*test.APIResource
actions []*recordResourcesAction
// {pvName:{approach: reason}}
expectSkippedPVs map[string]map[string]string
expectNotSkippedPVs []string
}{
{
name: "backup item action returns the 'not a CSI volume' error and the PV should be tracked as skippedPV",
backupReq: &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVCs(
builder.ForPersistentVolumeClaim("ns-1", "pvc-1").VolumeName("pv-1").Result(),
),
},
actions: []*recordResourcesAction{
new(recordResourcesAction).WithName(csiBIAPluginName).ForNamespace("ns-1").ForResource("persistentvolumeclaims").WithSkippedCSISnapshotFlag(true),
},
expectSkippedPVs: map[string]map[string]string{
"pv-1": {
csiSnapshotApproach: "skipped b/c it's not a CSI volume",
},
},
},
{
name: "backup item action named as CSI plugin executed successfully and the PV will be removed from the skipped PV tracker",
backupReq: &Request{
Backup: defaultBackup().Result(),
SkippedPVTracker: &skipPVTracker{
RWMutex: &sync.RWMutex{},
pvs: map[string]map[string]string{
"pv-1": {
"any": "whatever reason",
},
},
},
},
apiResources: []*test.APIResource{
test.PVCs(
builder.ForPersistentVolumeClaim("ns-1", "pvc-1").VolumeName("pv-1").Result(),
),
},
actions: []*recordResourcesAction{
new(recordResourcesAction).ForNamespace("ns-1").ForResource("persistentvolumeclaims").WithName(csiBIAPluginName),
},
expectNotSkippedPVs: []string{"pv-1"},
},
}
for _, tc := range tests {
t.Run(tc.name, func(tt *testing.T) {
var (
h = newHarness(t)
backupFile = bytes.NewBuffer([]byte{})
)
for _, resource := range tc.apiResources {
h.addItems(t, resource)
}
actions := []biav2.BackupItemAction{}
for _, action := range tc.actions {
actions = append(actions, action)
}
err := h.backupper.Backup(h.log, tc.backupReq, backupFile, actions, nil)
assert.NoError(t, err)
if tc.expectSkippedPVs != nil {
for pvName, reasons := range tc.expectSkippedPVs {
v, ok := tc.backupReq.SkippedPVTracker.pvs[pvName]
assert.True(tt, ok)
for approach, reason := range reasons {
assert.Equal(tt, reason, v[approach])
}
}
}
for _, pvName := range tc.expectNotSkippedPVs {
_, ok := tc.backupReq.SkippedPVTracker.pvs[pvName]
assert.False(tt, ok)
}
})
}
}
// TestBackupActionsRunsForCorrectItems runs backups with backup item actions, and
// verifies that each backup item action is run for the correct set of resources based on its
// AppliesTo() resource selector. Verification is done by using the recordResourcesAction struct,
@@ -1456,8 +1592,11 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1534,8 +1673,11 @@ func TestBackupWithInvalidActions(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1677,8 +1819,11 @@ func TestBackupActionModifications(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -1929,8 +2074,11 @@ func TestBackupActionAdditionalItems(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2077,6 +2225,7 @@ func (*fakeVolumeSnapshotter) DeleteSnapshot(snapshotID string) error {
// looking at the backup request's VolumeSnapshots field. This test uses the fakeVolumeSnapshotter
// struct in place of real volume snapshotters.
func TestBackupWithSnapshots(t *testing.T) {
// TODO: add more verification for skippedPVTracker
tests := []struct {
name string
req *Request
@@ -2092,6 +2241,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2125,6 +2275,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2159,6 +2310,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2193,6 +2345,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2227,6 +2380,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2259,6 +2413,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2273,7 +2428,8 @@ func TestBackupWithSnapshots(t *testing.T) {
{
name: "backup with no volume snapshot locations does not create any snapshots",
req: &Request{
Backup: defaultBackup().Result(),
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2292,6 +2448,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2308,6 +2465,7 @@ func TestBackupWithSnapshots(t *testing.T) {
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{
newSnapshotLocation("velero", "default", "default"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2327,6 +2485,7 @@ func TestBackupWithSnapshots(t *testing.T) {
newSnapshotLocation("velero", "default", "default"),
newSnapshotLocation("velero", "another", "another"),
},
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.PVs(
@@ -2455,7 +2614,8 @@ func TestBackupWithAsyncOperations(t *testing.T) {
{
name: "action that starts a short-running process records operation",
req: &Request{
Backup: defaultBackup().Result(),
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.Pods(
@@ -2484,7 +2644,8 @@ func TestBackupWithAsyncOperations(t *testing.T) {
{
name: "action that starts a long-running process records operation",
req: &Request{
Backup: defaultBackup().Result(),
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.Pods(
@@ -2513,7 +2674,8 @@ func TestBackupWithAsyncOperations(t *testing.T) {
{
name: "action that has no operation doesn't record one",
req: &Request{
Backup: defaultBackup().Result(),
Backup: defaultBackup().Result(),
SkippedPVTracker: NewSkipPVTracker(),
},
apiResources: []*test.APIResource{
test.Pods(
@@ -2592,8 +2754,11 @@ func TestBackupWithInvalidHooks(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -2840,8 +3005,11 @@ func TestBackupWithHooks(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
podCommandExecutor = new(test.MockPodCommandExecutor)
)
@@ -2881,12 +3049,13 @@ type fakePodVolumeBackupper struct{}
// BackupPodVolumes returns one pod volume backup per entry in volumes, with namespace "velero"
// and name "pvb-<pod-namespace>-<pod-name>-<volume-name>".
func (b *fakePodVolumeBackupper) BackupPodVolumes(backup *velerov1.Backup, pod *corev1.Pod, volumes []string, _ *resourcepolicies.Policies, _ logrus.FieldLogger) ([]*velerov1.PodVolumeBackup, []error) {
func (b *fakePodVolumeBackupper) BackupPodVolumes(backup *velerov1.Backup, pod *corev1.Pod, volumes []string, _ *resourcepolicies.Policies, _ logrus.FieldLogger) ([]*velerov1.PodVolumeBackup, *podvolume.PVCBackupSummary, []error) {
var res []*velerov1.PodVolumeBackup
pvcSummary := podvolume.NewPVCBackupSummary()
anno := pod.GetAnnotations()
if anno != nil && anno["backup.velero.io/bakupper-skip"] != "" {
return res, nil
return res, pvcSummary, nil
}
for _, vol := range volumes {
@@ -2894,7 +3063,7 @@ func (b *fakePodVolumeBackupper) BackupPodVolumes(backup *velerov1.Backup, pod *
res = append(res, pvb)
}
return res, nil
return res, pvcSummary, nil
}
// TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup,
@@ -3005,8 +3174,12 @@ func TestBackupWithPodVolume(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup, SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl}}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl},
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -4086,8 +4259,11 @@ func TestBackupNewResourceFiltering(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)
@@ -4230,8 +4406,11 @@ func TestBackupNamespaces(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
h = newHarness(t)
req = &Request{Backup: tc.backup}
h = newHarness(t)
req = &Request{
Backup: tc.backup,
SkippedPVTracker: NewSkipPVTracker(),
}
backupFile = bytes.NewBuffer([]byte{})
)

View File

@@ -57,7 +57,10 @@ import (
const (
mustIncludeAdditionalItemAnnotation = "backup.velero.io/must-include-additional-items"
skippedNoCSIPVAnnotation = "backup.velero.io/skipped-no-csi-pv"
excludeFromBackupLabel = "velero.io/exclude-from-backup"
csiBIAPluginName = "velero.io/csi-pvc-backupper"
vsphereBIAPluginName = "velero.io/vsphere-pvc-backupper"
)
// itemBackupper can back up individual items to a tar writer.
@@ -125,6 +128,7 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
} else {
if metadata.GetLabels()[excludeFromBackupLabel] == "true" {
log.Infof("Excluding item because it has label %s=true", excludeFromBackupLabel)
ib.trackSkippedPV(obj, groupResource, "", fmt.Sprintf("item has label %s=true", excludeFromBackupLabel), log)
return false, itemFiles, nil
}
// NOTE: we have to re-check namespace & resource includes/excludes because it's possible that
@@ -181,6 +185,9 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
if err := ib.itemHookHandler.HandleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hook.PhasePre); err != nil {
return false, itemFiles, err
}
if optedOut, podName := ib.podVolumeSnapshotTracker.OptedoutByPod(namespace, name); optedOut {
ib.trackSkippedPV(obj, groupResource, podVolumeApproach, fmt.Sprintf("opted out due to annotation in pod %s", podName), log)
}
if groupResource == kuberesource.Pods {
// pod needs to be initialized for the unstructured converter
@@ -193,7 +200,8 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
// Get the list of volumes to back up using pod volume backup from the pod's annotations. Remove from this list
// any volumes that use a PVC that we've already backed up (this would be in a read-write-many scenario,
// where it's been backed up from another pod), since we don't need >1 backup per PVC.
for _, volume := range podvolume.GetVolumesByPod(pod, boolptr.IsSetToTrue(ib.backupRequest.Spec.DefaultVolumesToFsBackup)) {
includedVolumes, optedOutVolumes := podvolume.GetVolumesByPod(pod, boolptr.IsSetToTrue(ib.backupRequest.Spec.DefaultVolumesToFsBackup))
for _, volume := range includedVolumes {
// track the volumes that are PVCs using the PVC snapshot tracker, so that when we backup PVCs/PVs
// via an item action in the next step, we don't snapshot PVs that will have their data backed up
// with pod volume backup.
@@ -208,6 +216,9 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
}
pvbVolumes = append(pvbVolumes, volume)
}
for _, optedOutVol := range optedOutVolumes {
ib.podVolumeSnapshotTracker.Optout(pod, optedOutVol)
}
}
}
@@ -245,7 +256,7 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
if groupResource == kuberesource.Pods && pod != nil {
// this function will return partial results, so process podVolumeBackups
// even if there are errors.
podVolumeBackups, errs := ib.backupPodVolumes(log, pod, pvbVolumes)
podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes)
ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...)
backupErrs = append(backupErrs, errs...)
@@ -254,6 +265,25 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
for _, pvb := range podVolumeBackups {
ib.podVolumeSnapshotTracker.Take(pod, pvb.Spec.Volume)
}
// Track/Untrack the volumes based on podVolumePVCBackupSummary
if podVolumePVCBackupSummary != nil {
for _, skippedPVC := range podVolumePVCBackupSummary.Skipped {
if obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(skippedPVC.PVC); err != nil {
backupErrs = append(backupErrs, errors.WithStack(err))
} else {
ib.trackSkippedPV(&unstructured.Unstructured{Object: obj}, kuberesource.PersistentVolumeClaims,
podVolumeApproach, skippedPVC.Reason, log)
}
}
for _, pvc := range podVolumePVCBackupSummary.Backedup {
if obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvc); err != nil {
backupErrs = append(backupErrs, errors.WithStack(err))
} else {
ib.unTrackSkippedPV(&unstructured.Unstructured{Object: obj}, kuberesource.PersistentVolumeClaims, log)
}
}
}
}
log.Debug("Executing post hooks")
@@ -295,14 +325,14 @@ func getFileForArchive(namespace, name, groupResource, versionPath string, itemB
// backupPodVolumes triggers pod volume backups of the specified pod volumes, and returns a list of PodVolumeBackups
// for volumes that were successfully backed up, and a slice of any errors that were encountered.
func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) {
func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, *podvolume.PVCBackupSummary, []error) {
if len(volumes) == 0 {
return nil, nil
return nil, nil, nil
}
if ib.podVolumeBackupper == nil {
log.Warn("No pod volume backupper, not backing up pod's volumes")
return nil, nil
return nil, nil, nil
}
return ib.podVolumeBackupper.BackupPodVolumes(ib.backupRequest.Backup, pod, volumes, ib.backupRequest.ResPolicies, log)
@@ -327,16 +357,22 @@ func (ib *itemBackupper) executeActions(
return nil, itemFiles, errors.WithStack(err)
} else if act != nil && act.Type == resourcepolicies.Skip {
log.Infof("Skip executing Backup Item Action: %s of resource %s: %s/%s for the matched resource policies", actionName, groupResource, namespace, name)
ib.trackSkippedPV(obj, groupResource, "", "skipped due to resource policy ", log)
continue
}
updatedItem, additionalItemIdentifiers, operationID, postOperationItems, err := action.Execute(obj, ib.backupRequest.Backup)
if err != nil {
return nil, itemFiles, errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}
u := &unstructured.Unstructured{Object: updatedItem.UnstructuredContent()}
if actionName == csiBIAPluginName && additionalItemIdentifiers == nil && u.GetAnnotations()[skippedNoCSIPVAnnotation] == "true" {
// snapshot was skipped by CSI plugin
ib.trackSkippedPV(obj, groupResource, csiSnapshotApproach, "skipped b/c it's not a CSI volume", log)
delete(u.GetAnnotations(), skippedNoCSIPVAnnotation)
} else if actionName == csiBIAPluginName || actionName == vsphereBIAPluginName {
// the snapshot has been taken
ib.unTrackSkippedPV(obj, groupResource, log)
}
mustInclude := u.GetAnnotations()[mustIncludeAdditionalItemAnnotation] == "true" || finalize
// remove the annotation as it's for communication between BIA and velero server,
// we don't want the resource be restored with this annotation.
@@ -497,6 +533,8 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
return nil
} else if action != nil && action.Type == resourcepolicies.Skip {
log.Infof("skip snapshot of pv %s for the matched resource policies", pv.Name)
// at this point we are sure this object is PV therefore we'll call the tracker directly
ib.backupRequest.SkippedPVTracker.Track(pv.Name, volumeSnapshotApproach, "matched action is 'skip' in chosen resource policies")
return nil
}
}
@@ -551,6 +589,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
if volumeSnapshotter == nil {
// the PV may still has change to be snapshotted by CSI plugin's `PVCBackupItemAction` in PVC backup logic
log.Info("Persistent volume is not a supported volume type for Velero-native volumeSnapshotter snapshot, skipping.")
ib.backupRequest.SkippedPVTracker.Track(pv.Name, volumeSnapshotApproach, "no applicable volumesnapshotter found")
return nil
}
@@ -574,6 +613,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
snapshot := volumeSnapshot(ib.backupRequest.Backup, pv.Name, volumeID, volumeType, pvFailureDomainZone, location, iops)
var errs []error
ib.backupRequest.SkippedPVTracker.Untrack(pv.Name)
snapshotID, err := volumeSnapshotter.CreateSnapshot(snapshot.Spec.ProviderVolumeID, snapshot.Spec.VolumeAZ, tags)
if err != nil {
errs = append(errs, errors.Wrap(err, "error taking snapshot of volume"))
@@ -589,7 +629,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
}
func (ib *itemBackupper) getMatchAction(obj runtime.Unstructured, groupResource schema.GroupResource, backupItemActionName string) (*resourcepolicies.Action, error) {
if ib.backupRequest.ResPolicies != nil && groupResource == kuberesource.PersistentVolumeClaims && (backupItemActionName == "velero.io/csi-pvc-backupper" || backupItemActionName == "velero.io/vsphere-pvc-backupper") {
if ib.backupRequest.ResPolicies != nil && groupResource == kuberesource.PersistentVolumeClaims && (backupItemActionName == csiBIAPluginName || backupItemActionName == vsphereBIAPluginName) {
pvc := corev1api.PersistentVolumeClaim{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &pvc); err != nil {
return nil, errors.WithStack(err)
@@ -609,6 +649,48 @@ func (ib *itemBackupper) getMatchAction(obj runtime.Unstructured, groupResource
return nil, nil
}
// trackSkippedPV tracks the skipped PV based on the object and the given approach and reason
// this function will be called throughout the process of backup, it needs to handle any object
func (ib *itemBackupper) trackSkippedPV(obj runtime.Unstructured, groupResource schema.GroupResource, approach string, reason string, log logrus.FieldLogger) {
if name, err := getPVName(obj, groupResource); len(name) > 0 && err == nil {
ib.backupRequest.SkippedPVTracker.Track(name, approach, reason)
} else if err != nil {
log.WithError(err).Warnf("unable to get PV name, skip tracking.")
}
}
// unTrackSkippedPV removes skipped PV based on the object from the tracker
// this function will be called throughout the process of backup, it needs to handle any object
func (ib *itemBackupper) unTrackSkippedPV(obj runtime.Unstructured, groupResource schema.GroupResource, log logrus.FieldLogger) {
if name, err := getPVName(obj, groupResource); len(name) > 0 && err == nil {
ib.backupRequest.SkippedPVTracker.Untrack(name)
} else if err != nil {
log.WithError(err).Warnf("unable to get PV name, skip untracking.")
}
}
// convert the input object to PV/PVC and get the PV name
func getPVName(obj runtime.Unstructured, groupResource schema.GroupResource) (string, error) {
if groupResource == kuberesource.PersistentVolumes {
pv := new(corev1api.PersistentVolume)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pv); err != nil {
return "", fmt.Errorf("failed to convert object to PV: %w", err)
}
return pv.Name, nil
}
if groupResource == kuberesource.PersistentVolumeClaims {
pvc := new(corev1api.PersistentVolumeClaim)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pvc); err != nil {
return "", fmt.Errorf("failed to convert object to PVC: %w", err)
}
if pvc.Spec.VolumeName == "" {
return "", fmt.Errorf("PV name is not set in PVC")
}
return pvc.Spec.VolumeName, nil
}
return "", nil
}
func volumeSnapshot(backup *velerov1api.Backup, volumeName, volumeID, volumeType, az, location string, iops *int64) *volume.Snapshot {
return &volume.Snapshot{
Spec: volume.SnapshotSpec{

View File

@@ -19,6 +19,11 @@ package backup
import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/stretchr/testify/assert"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -170,3 +175,65 @@ func Test_zoneFromPVNodeAffinity(t *testing.T) {
})
}
}
func TestGetPVName(t *testing.T) {
testcases := []struct {
name string
obj metav1.Object
groupResource schema.GroupResource
pvName string
hasErr bool
}{
{
name: "pv should return pv name",
obj: builder.ForPersistentVolume("test-pv").Result(),
groupResource: kuberesource.PersistentVolumes,
pvName: "test-pv",
hasErr: false,
},
{
name: "pvc without volumeName should return error",
obj: builder.ForPersistentVolumeClaim("ns", "pvc-1").Result(),
groupResource: kuberesource.PersistentVolumeClaims,
pvName: "",
hasErr: true,
},
{
name: "pvc with volumeName should return pv name",
obj: builder.ForPersistentVolumeClaim("ns", "pvc-1").VolumeName("test-pv-2").Result(),
groupResource: kuberesource.PersistentVolumeClaims,
pvName: "test-pv-2",
hasErr: false,
},
{
name: "unsupported group resource should return empty pv name",
obj: builder.ForPod("ns", "pod1").Result(),
groupResource: kuberesource.Pods,
pvName: "",
hasErr: false,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
o := &unstructured.Unstructured{Object: nil}
if tc.obj != nil {
data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.obj)
o = &unstructured.Unstructured{Object: data}
require.Nil(t, err)
}
name, err2 := getPVName(o, tc.groupResource)
assert.Equal(t, tc.pvName, name)
assert.Equal(t, tc.hasErr, err2 != nil)
})
}
}
func TestRandom(t *testing.T) {
pv := new(corev1api.PersistentVolume)
pvc := new(corev1api.PersistentVolumeClaim)
obj := builder.ForPod("ns1", "pod1").ServiceAccount("sa").Result()
o, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
err1 := runtime.DefaultUnstructuredConverter.FromUnstructured(o, pv)
err2 := runtime.DefaultUnstructuredConverter.FromUnstructured(o, pvc)
t.Logf("err1: %v, err2: %v", err1, err2)
}

View File

@@ -0,0 +1,96 @@
package backup
import (
"sort"
"sync"
)
type SkippedPV struct {
Name string `json:"name"`
Reasons []PVSkipReason `json:"reasons"`
}
type PVSkipReason struct {
Approach string `json:"approach"`
Reason string `json:"reason"`
}
// skipPVTracker keeps track of persistent volumes that have been skipped and the reason why they are skipped.
type skipPVTracker struct {
*sync.RWMutex
// pvs is a map of name of the pv to the list of reasons why it is skipped.
// The reasons are stored in a map each key of the map is the backup approach, each approach can have one reason
pvs map[string]map[string]string
}
const (
podVolumeApproach = "podvolume"
csiSnapshotApproach = "csiSnapshot"
volumeSnapshotApproach = "volumeSnapshot"
anyApproach = "any"
)
func NewSkipPVTracker() *skipPVTracker {
return &skipPVTracker{
RWMutex: &sync.RWMutex{},
pvs: make(map[string]map[string]string),
}
}
// Track tracks the pv with the specified name and the reason why it is skipped
func (pt *skipPVTracker) Track(name, approach, reason string) {
pt.Lock()
defer pt.Unlock()
if name == "" || reason == "" {
return
}
skipReasons := pt.pvs[name]
if skipReasons == nil {
skipReasons = make(map[string]string, 0)
pt.pvs[name] = skipReasons
}
if approach == "" {
approach = anyApproach
}
skipReasons[approach] = reason
}
// Untrack removes the pvc with the specified namespace and name.
func (pt *skipPVTracker) Untrack(name string) {
pt.Lock()
defer pt.Unlock()
delete(pt.pvs, name)
}
// Summary returns the summary of the tracked pvcs.
func (pt *skipPVTracker) Summary() []SkippedPV {
pt.RLock()
defer pt.RUnlock()
keys := make([]string, 0, len(pt.pvs))
for key := range pt.pvs {
keys = append(keys, key)
}
sort.Strings(keys)
res := make([]SkippedPV, 0, len(keys))
for _, key := range keys {
if skipReasons := pt.pvs[key]; len(skipReasons) > 0 {
entry := SkippedPV{
Name: key,
Reasons: make([]PVSkipReason, 0, len(skipReasons)),
}
approaches := make([]string, 0, len(skipReasons))
for a := range skipReasons {
approaches = append(approaches, a)
}
sort.Strings(approaches)
for _, a := range approaches {
entry.Reasons = append(entry.Reasons, PVSkipReason{
Approach: a,
Reason: skipReasons[a],
})
}
res = append(res, entry)
}
}
return res
}

View File

@@ -0,0 +1,43 @@
package backup
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSummary(t *testing.T) {
tracker := NewSkipPVTracker()
tracker.Track("pv5", "", "skipped due to policy")
tracker.Track("pv3", podVolumeApproach, "it's set to opt-out")
tracker.Track("pv3", csiSnapshotApproach, "not applicable for CSI ")
// shouldn't be added
tracker.Track("", podVolumeApproach, "pvc3 is set to be skipped")
tracker.Track("pv10", volumeSnapshotApproach, "added by mistake")
tracker.Untrack("pv10")
expected := []SkippedPV{
{
Name: "pv3",
Reasons: []PVSkipReason{
{
Approach: csiSnapshotApproach,
Reason: "not applicable for CSI ",
},
{
Approach: podVolumeApproach,
Reason: "it's set to opt-out",
},
},
},
{
Name: "pv5",
Reasons: []PVSkipReason{
{
Approach: anyApproach,
Reason: "skipped due to policy",
},
},
},
}
assert.Equal(t, expected, tracker.Summary())
}

View File

@@ -0,0 +1,38 @@
package backup
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware-tanzu/velero/pkg/builder"
)
func TestOptoutVolume(t *testing.T) {
pod := builder.ForPod("ns-1", "pod-1").Volumes(
builder.ForVolume("pod-vol-1").PersistentVolumeClaimSource("pvc-1").Result(),
builder.ForVolume("pod-vol-2").PersistentVolumeClaimSource("pvc-2").Result(),
).Result()
tracker := newPVCSnapshotTracker()
tracker.Optout(pod, "pod-vol-1")
ok, pn := tracker.OptedoutByPod("ns-1", "pvc-1")
assert.True(t, ok)
assert.Equal(t, "pod-1", pn)
// if a volume is tracked for opted out, it can't be tracked as "tracked" or "taken"
tracker.Track(pod, "pod-vol-1")
tracker.Track(pod, "pod-vol-2")
assert.False(t, tracker.Has("ns-1", "pvc-1"))
assert.True(t, tracker.Has("ns-1", "pvc-2"))
tracker.Take(pod, "pod-vol-1")
tracker.Take(pod, "pod-vol-2")
ok1, _ := tracker.TakenForPodVolume(pod, "pod-vol-1")
assert.False(t, ok1)
ok2, _ := tracker.TakenForPodVolume(pod, "pod-vol-2")
assert.True(t, ok2)
}
func TestABC(t *testing.T) {
tracker := newPVCSnapshotTracker()
v1, v2 := tracker.OptedoutByPod("a", "b")
t.Logf("v1: %v, v2: %v", v1, v2)
}

View File

@@ -22,43 +22,69 @@ import (
corev1api "k8s.io/api/core/v1"
)
// pvcSnapshotTracker keeps track of persistent volume claims that have been snapshotted
// with pod volume backup.
// pvcSnapshotTracker keeps track of persistent volume claims that have been handled
// via pod volume backup.
type pvcSnapshotTracker struct {
pvcs map[string]pvcSnapshotStatus
pvcs map[string]pvcSnapshotStatus
pvcPod map[string]string
}
type pvcSnapshotStatus struct {
taken bool
}
type pvcSnapshotStatus int
const (
pvcSnapshotStatusNotTracked pvcSnapshotStatus = -1
pvcSnapshotStatusTracked pvcSnapshotStatus = iota
pvcSnapshotStatusTaken
pvcSnapshotStatusOptedout
)
func newPVCSnapshotTracker() *pvcSnapshotTracker {
return &pvcSnapshotTracker{
pvcs: make(map[string]pvcSnapshotStatus),
// key: pvc ns/name, value: pod name
pvcPod: make(map[string]string),
}
}
// Track indicates a volume from a pod should be snapshotted by pod volume backup.
func (t *pvcSnapshotTracker) Track(pod *corev1api.Pod, volumeName string) {
// if the volume is a PVC, track it
for _, volume := range pod.Spec.Volumes {
if volume.Name == volumeName {
if volume.PersistentVolumeClaim != nil {
if _, ok := t.pvcs[key(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)]; !ok {
t.pvcs[key(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)] = pvcSnapshotStatus{false}
}
}
break
}
}
t.recordStatus(pod, volumeName, pvcSnapshotStatusTracked, pvcSnapshotStatusNotTracked)
}
// Take indicates a volume from a pod has been taken by pod volume backup.
func (t *pvcSnapshotTracker) Take(pod *corev1api.Pod, volumeName string) {
t.recordStatus(pod, volumeName, pvcSnapshotStatusTaken, pvcSnapshotStatusTracked)
}
// Optout indicates a volume from a pod has been opted out by pod's annotation
func (t *pvcSnapshotTracker) Optout(pod *corev1api.Pod, volumeName string) {
t.recordStatus(pod, volumeName, pvcSnapshotStatusOptedout, pvcSnapshotStatusNotTracked)
}
// OptedoutByPod returns true if the PVC with the specified namespace and name has been opted out by the pod. The
// second return value is the name of the pod which has the annotation that opted out the volume/pvc
func (t *pvcSnapshotTracker) OptedoutByPod(namespace, name string) (bool, string) {
status, found := t.pvcs[key(namespace, name)]
if !found || status != pvcSnapshotStatusOptedout {
return false, ""
}
return true, t.pvcPod[key(namespace, name)]
}
// if the volume is a PVC, record the status and the related pod
func (t *pvcSnapshotTracker) recordStatus(pod *corev1api.Pod, volumeName string, status pvcSnapshotStatus, preReqStatus pvcSnapshotStatus) {
for _, volume := range pod.Spec.Volumes {
if volume.Name == volumeName {
if volume.PersistentVolumeClaim != nil {
t.pvcs[key(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)] = pvcSnapshotStatus{true}
t.pvcPod[key(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)] = pod.Name
currStatus, ok := t.pvcs[key(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)]
if !ok {
currStatus = pvcSnapshotStatusNotTracked
}
if currStatus == preReqStatus {
t.pvcs[key(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)] = status
}
}
break
}
@@ -67,8 +93,8 @@ func (t *pvcSnapshotTracker) Take(pod *corev1api.Pod, volumeName string) {
// Has returns true if the PVC with the specified namespace and name has been tracked.
func (t *pvcSnapshotTracker) Has(namespace, name string) bool {
_, found := t.pvcs[key(namespace, name)]
return found
status, found := t.pvcs[key(namespace, name)]
return found && (status == pvcSnapshotStatusTracked || status == pvcSnapshotStatusTaken)
}
// TakenForPodVolume returns true and the PVC's name if the pod volume with the specified name uses a
@@ -88,7 +114,7 @@ func (t *pvcSnapshotTracker) TakenForPodVolume(pod *corev1api.Pod, volume string
return false, ""
}
if !status.taken {
if status != pvcSnapshotStatusTaken {
return false, ""
}

View File

@@ -54,6 +54,7 @@ type Request struct {
CSISnapshots []snapshotv1api.VolumeSnapshot
itemOperationsList *[]*itemoperation.BackupOperation
ResPolicies *resourcepolicies.Policies
SkippedPVTracker *skipPVTracker
}
// GetItemOperationsList returns ItemOperationsList, initializing it if necessary

View File

@@ -314,7 +314,8 @@ func (b *backupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logger logrus.FieldLogger) *pkgbackup.Request {
request := &pkgbackup.Request{
Backup: backup.DeepCopy(), // don't modify items in the cache
Backup: backup.DeepCopy(), // don't modify items in the cache
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
}
// set backup major version - deprecated, use Status.FormatVersion
@@ -341,7 +342,7 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
// calculate expiration
request.Status.Expiration = &metav1.Time{Time: b.clock.Now().Add(request.Spec.TTL.Duration)}
// TODO: post v1.10. Remove this code block after DefaultVolumesToRestic is removed from CRD
// TODO: After we drop the support for backup v1 CR. Remove this code block after DefaultVolumesToRestic is removed from CRD
// For now, for CRs created by old versions, we need to respect the DefaultVolumesToRestic value if it is set true
if boolptr.IsSetToTrue(request.Spec.DefaultVolumesToRestic) {
logger.Warn("DefaultVolumesToRestic field will be deprecated, use DefaultVolumesToFsBackup instead. Automatically remap it to DefaultVolumesToFsBackup")

View File

@@ -140,8 +140,9 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
backupRequest := &pkgbackup.Request{
Backup: backup,
StorageLocation: location,
Backup: backup,
StorageLocation: location,
SkippedPVTracker: pkgbackup.NewSkipPVTracker(),
}
var outBackupFile *os.File
if len(operations) > 0 {

View File

@@ -42,7 +42,7 @@ import (
// Backupper can execute pod volume backups of volumes in a pod.
type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, []error)
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
}
type backupper struct {
@@ -59,6 +59,45 @@ type backupper struct {
resultsLock sync.Mutex
}
type skippedPVC struct {
PVC *corev1api.PersistentVolumeClaim
Reason string
}
// PVCBackupSummary is a summary for which PVCs are skipped, which are backed up after each execution of the Backupper
// The scope should be within one pod, so the volume name is the key for the maps
type PVCBackupSummary struct {
Backedup map[string]*corev1api.PersistentVolumeClaim
Skipped map[string]*skippedPVC
pvcMap map[string]*corev1api.PersistentVolumeClaim
}
func NewPVCBackupSummary() *PVCBackupSummary {
return &PVCBackupSummary{
Backedup: make(map[string]*corev1api.PersistentVolumeClaim),
Skipped: make(map[string]*skippedPVC),
pvcMap: make(map[string]*corev1api.PersistentVolumeClaim),
}
}
func (pbs *PVCBackupSummary) addBackedup(volumeName string) {
if pvc, ok := pbs.pvcMap[volumeName]; ok {
pbs.Backedup[volumeName] = pvc
delete(pbs.Skipped, volumeName)
}
}
func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) {
if pvc, ok := pbs.pvcMap[volumeName]; ok {
if _, ok2 := pbs.Backedup[volumeName]; !ok2 { // if it's not backed up, add it to skipped
pbs.Skipped[volumeName] = &skippedPVC{
PVC: pvc,
Reason: reason,
}
}
}
}
func newBackupper(
ctx context.Context,
repoLocker *repository.RepoLocker,
@@ -127,35 +166,26 @@ func (b *backupper) getMatchAction(resPolicies *resourcepolicies.Policies, pvc *
return nil, errors.Errorf("failed to check resource policies for empty volume")
}
func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, []error) {
func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) {
if len(volumesToBackup) == 0 {
return nil, nil
return nil, nil, nil
}
log.Infof("pod %s/%s has volumes to backup: %v", pod.Namespace, pod.Name, volumesToBackup)
err := kube.IsPodRunning(pod)
if err != nil {
for _, volumeName := range volumesToBackup {
err = errors.Wrapf(err, "backup for volume %s is skipped", volumeName)
log.WithError(err).Warn("Skip pod volume")
}
return nil, nil
}
err = nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.podClient)
err := nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.podClient)
if err != nil {
return nil, []error{err}
return nil, nil, []error{err}
}
repositoryType := getRepositoryType(b.uploaderType)
if repositoryType == "" {
err := errors.Errorf("empty repository type, uploader %s", b.uploaderType)
return nil, []error{err}
return nil, nil, []error{err}
}
repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace, backup.Spec.StorageLocation, repositoryType)
if err != nil {
return nil, []error{err}
return nil, nil, []error{err}
}
// get a single non-exclusive lock since we'll wait for all individual
@@ -175,10 +205,28 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
podVolumes = make(map[string]corev1api.Volume)
mountedPodVolumes = sets.String{}
)
pvcSummary := NewPVCBackupSummary()
// put the pod's volumes in a map for efficient lookup below
// put the pod's volumes and the PVC associated in maps for efficient lookup below
for _, podVolume := range pod.Spec.Volumes {
podVolumes[podVolume.Name] = podVolume
if podVolume.PersistentVolumeClaim != nil {
pvc, err := b.pvcClient.PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), podVolume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
if err != nil {
errs = append(errs, errors.Wrap(err, "error getting persistent volume claim for volume"))
continue
}
pvcSummary.pvcMap[podVolume.Name] = pvc
}
}
if err := kube.IsPodRunning(pod); err != nil {
for _, volumeName := range volumesToBackup {
err := errors.Wrapf(err, "backup for volume %s is skipped", volumeName)
log.WithError(err).Warn("Skip pod volume")
pvcSummary.addSkipped(volumeName, fmt.Sprintf("the pod the PVC is mounted to, %s/%s, is not running", pod.Namespace, pod.Name))
}
return nil, pvcSummary, nil
}
for _, container := range pod.Spec.Containers {
@@ -194,12 +242,11 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
log.Warnf("No volume named %s found in pod %s/%s, skipping", volumeName, pod.Namespace, pod.Name)
continue
}
var pvc *corev1api.PersistentVolumeClaim
if volume.PersistentVolumeClaim != nil {
pvc, err = b.pvcClient.PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
if err != nil {
errs = append(errs, errors.Wrap(err, "error getting persistent volume claim for volume"))
pvc, ok = pvcSummary.pvcMap[volumeName]
if !ok {
// there should have been error happened retrieving the PVC and it's recorded already
continue
}
}
@@ -219,7 +266,9 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
// volumes that are not mounted by any container should not be backed up, because
// its directory is not created
if !mountedPodVolumes.Has(volumeName) {
log.Warnf("Volume %s is declared in pod %s/%s but not mounted by any container, skipping", volumeName, pod.Namespace, pod.Name)
msg := fmt.Sprintf("volume %s is declared in pod %s/%s but not mounted by any container, skipping", volumeName, pod.Namespace, pod.Name)
log.Warn(msg)
pvcSummary.addSkipped(volumeName, msg)
continue
}
@@ -229,6 +278,7 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
} else if action != nil && action.Type == resourcepolicies.Skip {
log.Infof("skip backup of volume %s for the matched resource policies", volumeName)
pvcSummary.addSkipped(volumeName, "matched action is 'skip' in chosen resource policies")
continue
}
}
@@ -238,6 +288,7 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
errs = append(errs, err)
continue
}
pvcSummary.addBackedup(volumeName)
numVolumeSnapshots++
}
@@ -262,7 +313,7 @@ ForEachVolume:
delete(b.results, resultsKey(pod.Namespace, pod.Name))
b.resultsLock.Unlock()
return podVolumeBackups, errs
return podVolumeBackups, pvcSummary, errs
}
type pvGetter interface {

View File

@@ -238,9 +238,9 @@ func createBackupRepoObj() *velerov1api.BackupRepository {
func createPodObj(running bool, withVolume bool, withVolumeMounted bool, volumeNum int) *corev1api.Pod {
podObj := builder.ForPod("fake-ns", "fake-pod").Result()
podObj.Spec.NodeName = "fake-node-name"
if running {
podObj.Status.Phase = corev1api.PodRunning
podObj.Spec.NodeName = "fake-node-name"
}
if withVolume {
@@ -354,7 +354,16 @@ func TestBackupPodVolumes(t *testing.T) {
"fake-volume-1",
"fake-volume-2",
},
sourcePod: createPodObj(false, false, false, 2),
kubeClientObj: []runtime.Object{
createNodeAgentPodObj(true),
},
ctlClientObj: []runtime.Object{
createBackupRepoObj(),
},
runtimeScheme: scheme,
sourcePod: createPodObj(false, false, false, 2),
uploaderType: "kopia",
bsl: "fake-bsl",
},
{
name: "node-agent pod is not running in node",
@@ -608,7 +617,7 @@ func TestBackupPodVolumes(t *testing.T) {
},
},
}
// TODO add more verification around PVCBackupSummary returned by "BackupPodVolumes"
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
@@ -655,7 +664,7 @@ func TestBackupPodVolumes(t *testing.T) {
}
}()
pvbs, errs := bp.BackupPodVolumes(backupObj, test.sourcePod, test.volumes, nil, velerotest.NewLogger())
pvbs, _, errs := bp.BackupPodVolumes(backupObj, test.sourcePod, test.volumes, nil, velerotest.NewLogger())
if errs == nil {
assert.Nil(t, test.errs)
@@ -669,3 +678,30 @@ func TestBackupPodVolumes(t *testing.T) {
})
}
}
func TestPVCBackupSummary(t *testing.T) {
pbs := NewPVCBackupSummary()
pbs.pvcMap["vol-1"] = builder.ForPersistentVolumeClaim("ns-1", "pvc-1").VolumeName("pv-1").Result()
pbs.pvcMap["vol-2"] = builder.ForPersistentVolumeClaim("ns-2", "pvc-2").VolumeName("pv-2").Result()
// it won't be added if the volme is not in the pvc map.
pbs.addSkipped("vol-3", "whatever reason")
assert.Equal(t, 0, len(pbs.Skipped))
pbs.addBackedup("vol-3")
assert.Equal(t, 0, len(pbs.Backedup))
// only can be added as skipped when it's not in backedup set
pbs.addBackedup("vol-1")
assert.Equal(t, 1, len(pbs.Backedup))
assert.Equal(t, "pvc-1", pbs.Backedup["vol-1"].Name)
pbs.addSkipped("vol-1", "whatever reason")
assert.Equal(t, 0, len(pbs.Skipped))
pbs.addSkipped("vol-2", "vol-2 has to be skipped")
assert.Equal(t, 1, len(pbs.Skipped))
assert.Equal(t, "pvc-2", pbs.Skipped["vol-2"].PVC.Name)
// adding a vol as backedup removes it from skipped set
pbs.addBackedup("vol-2")
assert.Equal(t, 0, len(pbs.Skipped))
assert.Equal(t, 2, len(pbs.Backedup))
}

View File

@@ -253,9 +253,12 @@ func contains(list []string, k string) bool {
}
// GetVolumesByPod returns a list of volume names to backup for the provided pod.
func GetVolumesByPod(pod *corev1api.Pod, defaultVolumesToFsBackup bool) []string {
func GetVolumesByPod(pod *corev1api.Pod, defaultVolumesToFsBackup bool) ([]string, []string) {
// tracks the volumes that have been explicitly opted out of backup via the annotation in the pod
optedOutVolumes := make([]string, 0)
if !defaultVolumesToFsBackup {
return GetVolumesToBackup(pod)
return GetVolumesToBackup(pod), optedOutVolumes
}
volsToExclude := getVolumesToExclude(pod)
@@ -284,6 +287,7 @@ func GetVolumesByPod(pod *corev1api.Pod, defaultVolumesToFsBackup bool) []string
}
// don't backup volumes that are included in the exclude list.
if contains(volsToExclude, pv.Name) {
optedOutVolumes = append(optedOutVolumes, pv.Name)
continue
}
// don't include volumes that mount the default service account token.
@@ -292,5 +296,5 @@ func GetVolumesByPod(pod *corev1api.Pod, defaultVolumesToFsBackup bool) []string
}
podVolumes = append(podVolumes, pv.Name)
}
return podVolumes
return podVolumes, optedOutVolumes
}

View File

@@ -350,9 +350,12 @@ func TestGetVolumesToBackup(t *testing.T) {
func TestGetVolumesByPod(t *testing.T) {
testCases := []struct {
name string
pod *corev1api.Pod
expected []string
name string
pod *corev1api.Pod
expected struct {
included []string
optedOut []string
}
defaultVolumesToFsBackup bool
}{
{
@@ -365,7 +368,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{},
},
},
{
name: "should get all pod volumes when defaultVolumesToFsBackup is true and no PVs are excluded",
@@ -378,7 +387,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{},
},
},
{
name: "should get all pod volumes except ones excluded when defaultVolumesToFsBackup is true",
@@ -398,7 +413,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{"nonPvbPV1", "nonPvbPV2", "nonPvbPV3"},
},
},
{
name: "should exclude default service account token from pod volume backup",
@@ -413,7 +434,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{},
},
},
{
name: "should exclude host path volumes from pod volume backups",
@@ -435,7 +462,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{"nonPvbPV1", "nonPvbPV2", "nonPvbPV3"},
},
},
{
name: "should exclude volumes mounting secrets",
@@ -457,7 +490,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{"nonPvbPV1", "nonPvbPV2", "nonPvbPV3"},
},
},
{
name: "should exclude volumes mounting config maps",
@@ -479,7 +518,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{"nonPvbPV1", "nonPvbPV2", "nonPvbPV3"},
},
},
{
name: "should exclude projected volumes",
@@ -514,7 +559,13 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{},
},
},
{
name: "should exclude DownwardAPI volumes",
@@ -547,17 +598,27 @@ func TestGetVolumesByPod(t *testing.T) {
},
},
},
expected: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
expected: struct {
included []string
optedOut []string
}{
included: []string{"pvbPV1", "pvbPV2", "pvbPV3"},
optedOut: []string{},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := GetVolumesByPod(tc.pod, tc.defaultVolumesToFsBackup)
actualIncluded, actualOptedOut := GetVolumesByPod(tc.pod, tc.defaultVolumesToFsBackup)
sort.Strings(tc.expected)
sort.Strings(actual)
assert.Equal(t, tc.expected, actual)
sort.Strings(tc.expected.included)
sort.Strings(actualIncluded)
assert.Equal(t, tc.expected.included, actualIncluded)
sort.Strings(tc.expected.optedOut)
sort.Strings(actualOptedOut)
assert.Equal(t, tc.expected.optedOut, actualOptedOut)
})
}
}