mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-03 11:45:20 +00:00
Merge branch 'main' into vgdp-ms-pvr-data-path
This commit is contained in:
313
pkg/podvolume/backup_micro_service.go
Normal file
313
pkg/podvolume/backup_micro_service.go
Normal file
@@ -0,0 +1,313 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
cachetool "k8s.io/client-go/tools/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
podVolumeRequestor = "snapshot-pod-volume"
|
||||
)
|
||||
|
||||
// BackupMicroService process data mover backups inside the backup pod
|
||||
type BackupMicroService struct {
|
||||
ctx context.Context
|
||||
client client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
repoEnsurer *repository.Ensurer
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
logger logrus.FieldLogger
|
||||
dataPathMgr *datapath.Manager
|
||||
eventRecorder kube.EventRecorder
|
||||
|
||||
namespace string
|
||||
pvbName string
|
||||
pvb *velerov1api.PodVolumeBackup
|
||||
sourceTargetPath datapath.AccessPoint
|
||||
|
||||
resultSignal chan dataPathResult
|
||||
|
||||
pvbInformer cache.Informer
|
||||
pvbHandler cachetool.ResourceEventHandlerRegistration
|
||||
nodeName string
|
||||
}
|
||||
|
||||
type dataPathResult struct {
|
||||
err error
|
||||
result string
|
||||
}
|
||||
|
||||
func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvbName string, namespace string, nodeName string,
|
||||
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
|
||||
pvbInformer cache.Informer, log logrus.FieldLogger) *BackupMicroService {
|
||||
return &BackupMicroService{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
credentialGetter: cred,
|
||||
logger: log,
|
||||
repoEnsurer: repoEnsurer,
|
||||
dataPathMgr: dataPathMgr,
|
||||
namespace: namespace,
|
||||
pvbName: pvbName,
|
||||
sourceTargetPath: sourceTargetPath,
|
||||
nodeName: nodeName,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
pvbInformer: pvbInformer,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) Init() error {
|
||||
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvbName, r.nodeName, r.logger)
|
||||
|
||||
handler, err := r.pvbInformer.AddEventHandler(
|
||||
cachetool.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(oldObj any, newObj any) {
|
||||
oldPvb := oldObj.(*velerov1api.PodVolumeBackup)
|
||||
newPvb := newObj.(*velerov1api.PodVolumeBackup)
|
||||
|
||||
if newPvb.Name != r.pvbName {
|
||||
return
|
||||
}
|
||||
|
||||
if newPvb.Status.Phase != velerov1api.PodVolumeBackupPhaseInProgress {
|
||||
return
|
||||
}
|
||||
|
||||
if newPvb.Spec.Cancel && !oldPvb.Spec.Cancel {
|
||||
r.cancelPodVolumeBackup(newPvb)
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error adding PVB handler")
|
||||
}
|
||||
|
||||
r.pvbHandler = handler
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
|
||||
log := r.logger.WithFields(logrus.Fields{
|
||||
"PVB": r.pvbName,
|
||||
})
|
||||
|
||||
pvb := &velerov1api.PodVolumeBackup{}
|
||||
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||
err := r.client.Get(ctx, types.NamespacedName{
|
||||
Namespace: r.namespace,
|
||||
Name: r.pvbName,
|
||||
}, pvb)
|
||||
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return true, errors.Wrapf(err, "error to get PVB %s", r.pvbName)
|
||||
}
|
||||
|
||||
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress {
|
||||
return true, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to wait PVB")
|
||||
return "", errors.Wrap(err, "error waiting for PVB")
|
||||
}
|
||||
|
||||
r.pvb = pvb
|
||||
|
||||
log.Info("Run cancelable PVB")
|
||||
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: r.OnDataPathCompleted,
|
||||
OnFailed: r.OnDataPathFailed,
|
||||
OnCancelled: r.OnDataPathCancelled,
|
||||
OnProgress: r.OnDataPathProgress,
|
||||
}
|
||||
|
||||
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, podVolumeRequestor, ctx, r.client, pvb.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to create data path")
|
||||
}
|
||||
|
||||
log.Debug("Async fs br created")
|
||||
|
||||
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
|
||||
BSLName: pvb.Spec.BackupStorageLocation,
|
||||
SourceNamespace: pvb.Spec.Pod.Namespace,
|
||||
UploaderType: pvb.Spec.UploaderType,
|
||||
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
|
||||
RepoIdentifier: "",
|
||||
RepositoryEnsurer: r.repoEnsurer,
|
||||
CredentialGetter: r.credentialGetter,
|
||||
}); err != nil {
|
||||
return "", errors.Wrap(err, "error to initialize data path")
|
||||
}
|
||||
|
||||
log.Info("Async fs br init")
|
||||
|
||||
tags := map[string]string{}
|
||||
|
||||
if err := fsBackup.StartBackup(r.sourceTargetPath, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
|
||||
RealSource: GetRealSource(pvb),
|
||||
ParentSnapshot: "",
|
||||
ForceFull: false,
|
||||
Tags: tags,
|
||||
}); err != nil {
|
||||
return "", errors.Wrap(err, "error starting data path backup")
|
||||
}
|
||||
|
||||
log.Info("Async fs backup data path started")
|
||||
r.eventRecorder.Event(pvb, false, datapath.EventReasonStarted, "Data path for %s started", pvb.Name)
|
||||
|
||||
result := ""
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = errors.New("timed out waiting for fs backup to complete")
|
||||
break
|
||||
case res := <-r.resultSignal:
|
||||
err = res.err
|
||||
result = res.result
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Async fs backup was not completed")
|
||||
}
|
||||
|
||||
r.eventRecorder.EndingEvent(pvb, false, datapath.EventReasonStopped, "Data path for %s stopped", pvb.Name)
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) Shutdown() {
|
||||
r.eventRecorder.Shutdown()
|
||||
r.closeDataPath(r.ctx, r.pvbName)
|
||||
|
||||
if r.pvbHandler != nil {
|
||||
if err := r.pvbInformer.RemoveEventHandler(r.pvbHandler); err != nil {
|
||||
r.logger.WithError(err).Warn("Failed to remove pod handler")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var funcMarshal = json.Marshal
|
||||
|
||||
func (r *BackupMicroService) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) {
|
||||
log := r.logger.WithField("PVB", pvbName)
|
||||
|
||||
backupBytes, err := funcMarshal(result.Backup)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to marshal backup result %v", result.Backup)
|
||||
r.resultSignal <- dataPathResult{
|
||||
err: errors.Wrapf(err, "Failed to marshal backup result %v", result.Backup),
|
||||
}
|
||||
} else {
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonCompleted, string(backupBytes))
|
||||
r.resultSignal <- dataPathResult{
|
||||
result: string(backupBytes),
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Async fs backup completed")
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) {
|
||||
log := r.logger.WithField("PVB", pvbName)
|
||||
log.WithError(err).Error("Async fs backup data path failed")
|
||||
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonFailed, "Data path for PVB %s failed, error %v", r.pvbName, err)
|
||||
r.resultSignal <- dataPathResult{
|
||||
err: errors.Wrapf(err, "Data path for PVB %s failed", r.pvbName),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) {
|
||||
log := r.logger.WithField("PVB", pvbName)
|
||||
log.Warn("Async fs backup data path canceled")
|
||||
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonCancelled, "Data path for PVB %s canceled", pvbName)
|
||||
r.resultSignal <- dataPathResult{
|
||||
err: errors.New(datapath.ErrCancelled),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) OnDataPathProgress(ctx context.Context, namespace string, pvbName string, progress *uploader.Progress) {
|
||||
log := r.logger.WithFields(logrus.Fields{
|
||||
"PVB": pvbName,
|
||||
})
|
||||
|
||||
progressBytes, err := funcMarshal(progress)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
|
||||
return
|
||||
}
|
||||
|
||||
r.eventRecorder.Event(r.pvb, false, datapath.EventReasonProgress, string(progressBytes))
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) {
|
||||
fsBackup := r.dataPathMgr.GetAsyncBR(duName)
|
||||
if fsBackup != nil {
|
||||
fsBackup.Close(ctx)
|
||||
}
|
||||
|
||||
r.dataPathMgr.RemoveAsyncBR(duName)
|
||||
}
|
||||
|
||||
func (r *BackupMicroService) cancelPodVolumeBackup(pvb *velerov1api.PodVolumeBackup) {
|
||||
r.logger.WithField("PVB", pvb.Name).Info("PVB is being canceled")
|
||||
|
||||
r.eventRecorder.Event(pvb, false, datapath.EventReasonCancelling, "Canceling for PVB %s", pvb.Name)
|
||||
|
||||
fsBackup := r.dataPathMgr.GetAsyncBR(pvb.Name)
|
||||
if fsBackup == nil {
|
||||
r.OnDataPathCancelled(r.ctx, pvb.GetNamespace(), pvb.GetName())
|
||||
} else {
|
||||
fsBackup.Cancel()
|
||||
}
|
||||
}
|
||||
447
pkg/podvolume/backup_micro_service_test.go
Normal file
447
pkg/podvolume/backup_micro_service_test.go
Normal file
@@ -0,0 +1,447 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
|
||||
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
|
||||
)
|
||||
|
||||
type backupMsTestHelper struct {
|
||||
eventReason string
|
||||
eventMsg string
|
||||
marshalErr error
|
||||
marshalBytes []byte
|
||||
withEvent bool
|
||||
eventLock sync.Mutex
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
bt.withEvent = true
|
||||
bt.eventReason = reason
|
||||
bt.eventMsg = fmt.Sprintf(message, a...)
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
bt.withEvent = true
|
||||
bt.eventReason = reason
|
||||
bt.eventMsg = fmt.Sprintf(message, a...)
|
||||
}
|
||||
func (bt *backupMsTestHelper) Shutdown() {}
|
||||
|
||||
func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) {
|
||||
if bt.marshalErr != nil {
|
||||
return nil, bt.marshalErr
|
||||
}
|
||||
|
||||
return bt.marshalBytes, nil
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) EventReason() string {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
return bt.eventReason
|
||||
}
|
||||
|
||||
func (bt *backupMsTestHelper) EventMessage() string {
|
||||
bt.eventLock.Lock()
|
||||
defer bt.eventLock.Unlock()
|
||||
|
||||
return bt.eventMsg
|
||||
}
|
||||
|
||||
func TestOnDataPathFailed(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
pvbName: pvbName,
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
expectedErr := "Data path for PVB fake-pvb failed: fake-error"
|
||||
expectedEventReason := datapath.EventReasonFailed
|
||||
expectedEventMsg := "Data path for PVB fake-pvb failed, error fake-error"
|
||||
|
||||
go bs.OnDataPathFailed(context.TODO(), velerov1api.DefaultNamespace, pvbName, errors.New("fake-error"))
|
||||
|
||||
result := <-bs.resultSignal
|
||||
assert.EqualError(t, result.err, expectedErr)
|
||||
assert.Equal(t, expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
|
||||
func TestOnDataPathCancelled(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
pvbName: pvbName,
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
expectedErr := datapath.ErrCancelled
|
||||
expectedEventReason := datapath.EventReasonCancelled
|
||||
expectedEventMsg := "Data path for PVB fake-pvb canceled"
|
||||
|
||||
go bs.OnDataPathCancelled(context.TODO(), velerov1api.DefaultNamespace, pvbName)
|
||||
|
||||
result := <-bs.resultSignal
|
||||
assert.EqualError(t, result.err, expectedErr)
|
||||
assert.Equal(t, expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
|
||||
func TestOnDataPathCompleted(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedErr string
|
||||
expectedEventReason string
|
||||
expectedEventMsg string
|
||||
marshalErr error
|
||||
marshallStr string
|
||||
}{
|
||||
{
|
||||
name: "marshal fail",
|
||||
marshalErr: errors.New("fake-marshal-error"),
|
||||
expectedErr: "Failed to marshal backup result { false { } 0}: fake-marshal-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
marshallStr: "fake-complete-string",
|
||||
expectedEventReason: datapath.EventReasonCompleted,
|
||||
expectedEventMsg: "fake-complete-string",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
|
||||
bt := &backupMsTestHelper{
|
||||
marshalErr: test.marshalErr,
|
||||
marshalBytes: []byte(test.marshallStr),
|
||||
}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
funcMarshal = bt.Marshal
|
||||
|
||||
go bs.OnDataPathCompleted(context.TODO(), velerov1api.DefaultNamespace, pvbName, datapath.Result{})
|
||||
|
||||
result := <-bs.resultSignal
|
||||
if test.marshalErr != nil {
|
||||
assert.EqualError(t, result.err, test.expectedErr)
|
||||
} else {
|
||||
assert.NoError(t, result.err)
|
||||
assert.Equal(t, test.expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnDataPathProgress(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedErr string
|
||||
expectedEventReason string
|
||||
expectedEventMsg string
|
||||
marshalErr error
|
||||
marshallStr string
|
||||
}{
|
||||
{
|
||||
name: "marshal fail",
|
||||
marshalErr: errors.New("fake-marshal-error"),
|
||||
expectedErr: "Failed to marshal backup result",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
marshallStr: "fake-progress-string",
|
||||
expectedEventReason: datapath.EventReasonProgress,
|
||||
expectedEventMsg: "fake-progress-string",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
|
||||
bt := &backupMsTestHelper{
|
||||
marshalErr: test.marshalErr,
|
||||
marshalBytes: []byte(test.marshallStr),
|
||||
}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
funcMarshal = bt.Marshal
|
||||
|
||||
bs.OnDataPathProgress(context.TODO(), velerov1api.DefaultNamespace, pvbName, &uploader.Progress{})
|
||||
|
||||
if test.marshalErr != nil {
|
||||
assert.False(t, bt.withEvent)
|
||||
} else {
|
||||
assert.True(t, bt.withEvent)
|
||||
assert.Equal(t, test.expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelPodVolumeBackup(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedEventReason string
|
||||
expectedEventMsg string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "no fs backup",
|
||||
expectedEventReason: datapath.EventReasonCancelled,
|
||||
expectedEventMsg: "Data path for PVB fake-pvb canceled",
|
||||
expectedErr: datapath.ErrCancelled,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
pvb := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, pvbName).Result()
|
||||
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
go bs.cancelPodVolumeBackup(pvb)
|
||||
|
||||
result := <-bs.resultSignal
|
||||
|
||||
assert.EqualError(t, result.err, test.expectedErr)
|
||||
assert.True(t, bt.withEvent)
|
||||
assert.Equal(t, test.expectedEventReason, bt.EventReason())
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunCancelableDataPath(t *testing.T) {
|
||||
pvbName := "fake-pvb"
|
||||
pvb := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, pvbName).Phase(velerov1api.PodVolumeBackupPhaseNew).Result()
|
||||
pvbInProgress := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, pvbName).Phase(velerov1api.PodVolumeBackupPhaseInProgress).Result()
|
||||
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
result *dataPathResult
|
||||
dataPathMgr *datapath.Manager
|
||||
kubeClientObj []runtime.Object
|
||||
initErr error
|
||||
startErr error
|
||||
dataPathStarted bool
|
||||
expectedEventMsg string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "no pvb",
|
||||
ctx: ctxTimeout,
|
||||
expectedErr: "error waiting for PVB: context deadline exceeded",
|
||||
},
|
||||
{
|
||||
name: "pvb not in in-progress",
|
||||
ctx: ctxTimeout,
|
||||
kubeClientObj: []runtime.Object{pvb},
|
||||
expectedErr: "error waiting for PVB: context deadline exceeded",
|
||||
},
|
||||
{
|
||||
name: "create data path fail",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathMgr: datapath.NewManager(0),
|
||||
expectedErr: "error to create data path: Concurrent number exceeds",
|
||||
},
|
||||
{
|
||||
name: "init data path fail",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
initErr: errors.New("fake-init-error"),
|
||||
expectedErr: "error to initialize data path: fake-init-error",
|
||||
},
|
||||
{
|
||||
name: "start data path fail",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
startErr: errors.New("fake-start-error"),
|
||||
expectedErr: "error starting data path backup: fake-start-error",
|
||||
},
|
||||
{
|
||||
name: "data path timeout",
|
||||
ctx: ctxTimeout,
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathStarted: true,
|
||||
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvbName),
|
||||
expectedErr: "timed out waiting for fs backup to complete",
|
||||
},
|
||||
{
|
||||
name: "data path returns error",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathStarted: true,
|
||||
result: &dataPathResult{
|
||||
err: errors.New("fake-data-path-error"),
|
||||
},
|
||||
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvbName),
|
||||
expectedErr: "fake-data-path-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
ctx: context.Background(),
|
||||
kubeClientObj: []runtime.Object{pvbInProgress},
|
||||
dataPathStarted: true,
|
||||
result: &dataPathResult{
|
||||
result: "fake-succeed-result",
|
||||
},
|
||||
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", pvbName),
|
||||
},
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
velerov1api.AddToScheme(scheme)
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fakeClientBuilder := clientFake.NewClientBuilder()
|
||||
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
|
||||
|
||||
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
|
||||
|
||||
bt := &backupMsTestHelper{}
|
||||
|
||||
bs := &BackupMicroService{
|
||||
namespace: velerov1api.DefaultNamespace,
|
||||
pvbName: pvbName,
|
||||
ctx: context.Background(),
|
||||
client: fakeClient,
|
||||
dataPathMgr: datapath.NewManager(1),
|
||||
eventRecorder: bt,
|
||||
resultSignal: make(chan dataPathResult),
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
if test.ctx != nil {
|
||||
bs.ctx = test.ctx
|
||||
}
|
||||
|
||||
if test.dataPathMgr != nil {
|
||||
bs.dataPathMgr = test.dataPathMgr
|
||||
}
|
||||
|
||||
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
|
||||
fsBR := datapathmockes.NewAsyncBR(t)
|
||||
if test.initErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
|
||||
}
|
||||
|
||||
if test.startErr != nil {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
|
||||
fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
|
||||
}
|
||||
|
||||
if test.dataPathStarted {
|
||||
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
|
||||
fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
return fsBR
|
||||
}
|
||||
|
||||
if test.result != nil {
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
bs.resultSignal <- *test.result
|
||||
}()
|
||||
}
|
||||
|
||||
result, err := bs.RunCancelableDataPath(test.ctx)
|
||||
|
||||
if test.expectedErr != "" {
|
||||
assert.EqualError(t, err, test.expectedErr)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.result.result, result)
|
||||
}
|
||||
|
||||
if test.expectedEventMsg != "" {
|
||||
assert.True(t, bt.withEvent)
|
||||
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
@@ -17,12 +17,14 @@ limitations under the License.
|
||||
package podvolume
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/podvolume/configs"
|
||||
repotypes "github.com/vmware-tanzu/velero/pkg/repository/types"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
)
|
||||
@@ -143,6 +145,19 @@ func GetSnapshotIdentifier(podVolumeBackups *velerov1api.PodVolumeBackupList) ma
|
||||
return res
|
||||
}
|
||||
|
||||
func GetRealSource(pvb *velerov1api.PodVolumeBackup) string {
|
||||
pvcName := ""
|
||||
if pvb.Annotations != nil {
|
||||
pvcName = pvb.Annotations[configs.PVCNameAnnotation]
|
||||
}
|
||||
|
||||
if pvcName != "" {
|
||||
return fmt.Sprintf("%s/%s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvcName)
|
||||
} else {
|
||||
return fmt.Sprintf("%s/%s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Spec.Volume)
|
||||
}
|
||||
}
|
||||
|
||||
func getUploaderTypeOrDefault(uploaderType string) string {
|
||||
if uploaderType != "" {
|
||||
return uploaderType
|
||||
|
||||
@@ -300,3 +300,34 @@ func TestVolumeHasNonRestorableSource(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRealSource(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
pvb *velerov1api.PodVolumeBackup
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "pvb with empty annotation",
|
||||
pvb: builder.ForPodVolumeBackup("fake-ns", "fake-name").PodNamespace("fake-pod-ns").PodName("fake-pod-name").Volume("fake-volume").Result(),
|
||||
expected: "fake-pod-ns/fake-pod-name/fake-volume",
|
||||
},
|
||||
{
|
||||
name: "pvb without pvc name annotation",
|
||||
pvb: builder.ForPodVolumeBackup("fake-ns", "fake-name").PodNamespace("fake-pod-ns").PodName("fake-pod-name").Volume("fake-volume").Annotations(map[string]string{}).Result(),
|
||||
expected: "fake-pod-ns/fake-pod-name/fake-volume",
|
||||
},
|
||||
{
|
||||
name: "pvb with pvc name annotation",
|
||||
pvb: builder.ForPodVolumeBackup("fake-ns", "fake-name").PodNamespace("fake-pod-ns").PodName("fake-pod-name").Volume("fake-volume").Annotations(map[string]string{"velero.io/pvc-name": "fake-pvc-name"}).Result(),
|
||||
expected: "fake-pod-ns/fake-pod-name/fake-pvc-name",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual := GetRealSource(tc.pvb)
|
||||
assert.Equal(t, tc.expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user