Merge pull request #5214 from qiuming-best/uploader-restic

Uploader Implementation: Restic backup and restore
This commit is contained in:
lyndon
2022-09-02 11:11:38 +08:00
committed by GitHub
15 changed files with 580 additions and 363 deletions

View File

@@ -52,7 +52,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
@@ -197,22 +196,27 @@ func (s *resticServer) run() {
s.logger.Fatalf("Failed to create credentials file store: %v", err)
}
credSecretStore, err := credentials.NewNamespacedSecretStore(s.mgr.GetClient(), s.namespace)
if err != nil {
s.logger.Fatalf("Failed to create secret file store: %v", err)
}
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
pvbReconciler := controller.PodVolumeBackupReconciler{
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Clock: clock.RealClock{},
Metrics: s.metrics,
CredsFileStore: credentialFileStore,
NodeName: s.nodeName,
FileSystem: filesystem.NewFileSystem(),
ResticExec: restic.BackupExec{},
Log: s.logger,
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Clock: clock.RealClock{},
Metrics: s.metrics,
CredentialGetter: credentialGetter,
NodeName: s.nodeName,
FileSystem: filesystem.NewFileSystem(),
Log: s.logger,
}
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup)
}
if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialFileStore).SetupWithManager(s.mgr); err != nil {
if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialGetter).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

View File

@@ -19,8 +19,6 @@ package controller
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/pkg/errors"
@@ -28,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
@@ -35,31 +34,29 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/repository/util"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
// BackupExecuter runs backups.
type BackupExecuter interface {
RunBackup(*restic.Command, logrus.FieldLogger, func(velerov1api.PodVolumeOperationProgress)) (string, string, error)
GetSnapshotID(*restic.Command) (string, error)
}
// For unit test to mock function
var NewUploaderProviderFunc = provider.NewUploaderProvider
// PodVolumeBackupReconciler reconciles a PodVolumeBackup object
type PodVolumeBackupReconciler struct {
Scheme *runtime.Scheme
Client client.Client
Clock clock.Clock
Metrics *metrics.ServerMetrics
CredsFileStore credentials.FileStore
NodeName string
FileSystem filesystem.Interface
ResticExec BackupExecuter
Log logrus.FieldLogger
Scheme *runtime.Scheme
Client client.Client
Clock clock.Clock
Metrics *metrics.ServerMetrics
CredentialGetter *credentials.CredentialGetter
NodeName string
FileSystem filesystem.Interface
Log logrus.FieldLogger
}
type BackupProgressUpdater struct {
@@ -85,7 +82,6 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
return ctrl.Result{}, errors.Wrap(err, "getting PodVolumeBackup")
}
if len(pvb.OwnerReferences) == 1 {
log = log.WithField(
"backup",
@@ -128,16 +124,19 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log)
}
var resticDetails resticDetails
resticCmd, err := r.buildResticCommand(ctx, log, &pvb, &pod, &resticDetails)
volDir, err := kube.GetVolumeDirectory(ctx, log, &pod, pvb.Spec.Volume, r.Client)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "building Restic command", log)
return r.updateStatusToFailed(ctx, &pvb, err, "getting volume directory name", log)
}
defer func() {
os.Remove(resticDetails.credsFile)
os.Remove(resticDetails.caCertFile)
}()
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir)
log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := kube.SinglePathMatch(pathGlob, r.FileSystem, log)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "identifying unique volume path on host", log)
}
log.WithField("path", path).Debugf("Found path matching glob")
backupLocation := &velerov1api.BackupStorageLocation{}
if err := r.Client.Get(context.Background(), client.ObjectKey{
@@ -146,48 +145,55 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}, backupLocation); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location")
}
// #4820: restrieve insecureSkipTLSVerify from BSL configuration for
// AWS plugin. If nothing is return, that means insecureSkipTLSVerify
// is not enable for Restic command.
skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(backupLocation, log)
if len(skipTLSRet) > 0 {
resticCmd.ExtraFlags = append(resticCmd.ExtraFlags, skipTLSRet)
}
var stdout, stderr string
var emptySnapshot bool
stdout, stderr, err = r.ResticExec.RunBackup(resticCmd, log, r.updateBackupProgressFunc(&pvb, log))
selector := labels.SelectorFromSet(
map[string]string{
//TODO
//velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(pvb.Spec.BackupStorageLocation),
//velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
},
)
backupRepo, err := util.GetBackupRepositoryByLabel(ctx, r.Client, pvb.Namespace, selector)
if err != nil {
if strings.Contains(stderr, "snapshot is empty") {
emptySnapshot = true
return ctrl.Result{}, errors.Wrap(err, "error getting backup repository")
}
var uploaderProv provider.Provider
uploaderProv, err = NewUploaderProviderFunc(ctx, r.Client, pvb.Spec.UploaderType, pvb.Spec.RepoIdentifier,
backupLocation, &backupRepo, r.CredentialGetter, repokey.RepoKeySelector(), log)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "error creating uploader", log)
}
// If this is a PVC, look for the most recent completed pod volume backup for it and get
// its snapshot ID to do new backup based on it. Without this,
// if the pod using the PVC (and therefore the directory path under /host_pods/) has
// changed since the PVC's last backup, for backup, it will not be able to identify a suitable
// parent snapshot to use, and will have to do a full rescan of the contents of the PVC.
var parentSnapshotID string
if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok {
parentSnapshotID = r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation)
if parentSnapshotID == "" {
log.Info("No parent snapshot found for PVC, not based on parent snapshot for this backup")
} else {
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%s", stderr), log)
log.WithField("parentSnapshotID", parentSnapshotID).Info("Based on parent snapshot for this backup")
}
}
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
var snapshotID string
if !emptySnapshot {
cmd := restic.GetSnapshotCommand(pvb.Spec.RepoIdentifier, resticDetails.credsFile, pvb.Spec.Tags)
cmd.Env = resticDetails.envs
cmd.CACertFile = resticDetails.caCertFile
// #4820: also apply the insecureTLS flag to Restic snapshots command
if len(skipTLSRet) > 0 {
cmd.ExtraFlags = append(cmd.ExtraFlags, skipTLSRet)
defer func() {
if err := uploaderProv.Close(ctx); err != nil {
log.Errorf("failed to close uploader provider with error %v", err)
}
}()
snapshotID, err = r.ResticExec.GetSnapshotID(cmd)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "getting snapshot id", log)
}
snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx))
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running backup, stderr=%v", err), log)
}
// Update status to Completed with path & snapshot ID.
original = pvb.DeepCopy()
pvb.Status.Path = resticDetails.path
pvb.Status.Path = path
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted
pvb.Status.SnapshotID = snapshotID
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
@@ -202,8 +208,9 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time)
latencySeconds := float64(latencyDuration / time.Second)
backupName := fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name)
r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, resticCmd.Command, backupName, latencySeconds)
r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, resticCmd.Command, backupName, latencySeconds)
generateOpName := fmt.Sprintf("%s-%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace, pvb.Spec.UploaderType)
r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, generateOpName, backupName, latencySeconds)
r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, generateOpName, backupName, latencySeconds)
r.Metrics.RegisterPodVolumeBackupDequeue(r.NodeName)
log.Info("PodVolumeBackup completed")
@@ -272,18 +279,6 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l
return mostRecentPVB.Status.SnapshotID
}
// updateBackupProgressFunc returns a func that takes progress info and patches
// the PVB with the new progress.
func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
original := pvb.DeepCopy()
pvb.Status.Progress = progress
if err := r.Client.Patch(context.Background(), pvb, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error update progress")
}
}
}
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
@@ -298,81 +293,6 @@ func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pv
return ctrl.Result{}, nil
}
type resticDetails struct {
credsFile, caCertFile string
envs []string
path string
}
func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log *logrus.Entry, pvb *velerov1api.PodVolumeBackup, pod *corev1.Pod, details *resticDetails) (*restic.Command, error) {
volDir, err := kube.GetVolumeDirectory(ctx, log, pod, pvb.Spec.Volume, r.Client)
if err != nil {
return nil, errors.Wrap(err, "getting volume directory name")
}
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir)
log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")
path, err := kube.SinglePathMatch(pathGlob, r.FileSystem, log)
if err != nil {
return nil, errors.Wrap(err, "identifying unique volume path on host")
}
log.WithField("path", path).Debugf("Found path matching glob")
// Temporary credentials.
details.credsFile, err = r.CredsFileStore.Path(repokey.RepoKeySelector())
if err != nil {
return nil, errors.Wrap(err, "creating temporary Restic credentials file")
}
cmd := restic.BackupCommand(pvb.Spec.RepoIdentifier, details.credsFile, path, pvb.Spec.Tags)
backupLocation := &velerov1api.BackupStorageLocation{}
if err := r.Client.Get(context.Background(), client.ObjectKey{
Namespace: pvb.Namespace,
Name: pvb.Spec.BackupStorageLocation,
}, backupLocation); err != nil {
return nil, errors.Wrap(err, "getting backup storage location")
}
// If there's a caCert on the ObjectStorage, write it to disk so that it can
// be passed to Restic.
if backupLocation.Spec.ObjectStorage != nil &&
backupLocation.Spec.ObjectStorage.CACert != nil {
details.caCertFile, err = restic.TempCACertFile(backupLocation.Spec.ObjectStorage.CACert, pvb.Spec.BackupStorageLocation, r.FileSystem)
if err != nil {
log.WithError(err).Error("creating temporary caCert file")
}
}
cmd.CACertFile = details.caCertFile
details.envs, err = restic.CmdEnv(backupLocation, r.CredsFileStore)
if err != nil {
return nil, errors.Wrap(err, "setting Restic command environment")
}
cmd.Env = details.envs
// If this is a PVC, look for the most recent completed PodVolumeBackup for
// it and get its Restic snapshot ID to use as the value of the `--parent`
// flag. Without this, if the pod using the PVC (and therefore the directory
// path under /host_pods/) has changed since the PVC's last backup, Restic
// will not be able to identify a suitable parent snapshot to use, and will
// have to do a full rescan of the contents of the PVC.
if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok {
parentSnapshotID := r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation)
if parentSnapshotID == "" {
log.Info("No parent snapshot found for PVC, not using --parent flag for this backup")
} else {
log.WithField("parentSnapshotID", parentSnapshotID).
Info("Setting --parent flag for this backup")
cmd.ExtraFlags = append(cmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshotID))
}
}
return cmd, nil
}
func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger, ctx context.Context) *BackupProgressUpdater {
return &BackupProgressUpdater{pvb, log, ctx, r.Client}
}

View File

@@ -24,6 +24,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -35,11 +36,13 @@ import (
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
)
const name = "pvb-1"
@@ -68,12 +71,29 @@ func bslBuilder() *builder.BackupStorageLocationBuilder {
ForBackupStorageLocation(velerov1api.DefaultNamespace, "bsl-loc")
}
func buildBackupRepo() *velerov1api.BackupRepository {
return &velerov1api.BackupRepository{
Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""},
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1api.SchemeGroupVersion.String(),
Kind: "BackupRepository",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace),
Labels: map[string]string{
velerov1api.StorageLocationLabel: "bsl-loc",
},
},
}
}
var _ = Describe("PodVolumeBackup Reconciler", func() {
type request struct {
pvb *velerov1api.PodVolumeBackup
pod *corev1.Pod
bsl *velerov1api.BackupStorageLocation
pvb *velerov1api.PodVolumeBackup
pod *corev1.Pod
bsl *velerov1api.BackupStorageLocation
backupRepo *velerov1api.BackupRepository
expectedProcessed bool
expected *velerov1api.PodVolumeBackup
expectedRequeue ctrl.Result
@@ -100,31 +120,41 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
err = fakeClient.Create(ctx, test.bsl)
Expect(err).To(BeNil())
err = fakeClient.Create(ctx, test.backupRepo)
Expect(err).To(BeNil())
fakeFS := velerotest.NewFakeFileSystem()
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", "pvb-1-volume")
_, err = fakeFS.Create(pathGlob)
Expect(err).To(BeNil())
credentialFileStore, err := credentials.NewNamespacedFileStore(
fakeClient,
velerov1api.DefaultNamespace,
"/tmp/credentials",
fakeFS,
)
// Setup reconciler
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
r := PodVolumeBackupReconciler{
Client: fakeClient,
Clock: clock.NewFakeClock(now),
Metrics: metrics.NewResticServerMetrics(),
CredsFileStore: fakeCredsFileStore{},
NodeName: "test_node",
FileSystem: fakeFS,
ResticExec: mocks.FakeResticBackupExec{},
Log: velerotest.NewLogger(),
Client: fakeClient,
Clock: clock.NewFakeClock(now),
Metrics: metrics.NewResticServerMetrics(),
CredentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore},
NodeName: "test_node",
FileSystem: fakeFS,
Log: velerotest.NewLogger(),
}
NewUploaderProviderFunc = func(ctx context.Context, client kbclient.Client, uploaderType, repoIdentifier string, bsl *velerov1api.BackupStorageLocation, backupRepo *velerov1api.BackupRepository, credGetter *credentials.CredentialGetter, repoKeySelector *corev1.SecretKeySelector, log logrus.FieldLogger) (provider.Provider, error) {
return &fakeProvider{}, nil
}
actualResult, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: velerov1api.DefaultNamespace,
Name: test.pvb.Name,
},
})
Expect(actualResult).To(BeEquivalentTo(test.expectedRequeue))
if test.expectedErrMsg == "" {
Expect(err).To(BeNil())
@@ -137,7 +167,6 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Name: test.pvb.Name,
Namespace: test.pvb.Namespace,
}, &pvb)
// Assertions
if test.expected == nil {
Expect(apierrors.IsNotFound(err)).To(BeTrue())
@@ -160,6 +189,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
pvb: pvbBuilder().Phase("").Node("test_node").Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: true,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
@@ -173,6 +203,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: true,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
@@ -186,6 +217,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
@@ -199,6 +231,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
@@ -212,6 +245,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
@@ -225,6 +259,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
@@ -238,6 +273,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseNew).
@@ -251,6 +287,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
@@ -264,6 +301,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
@@ -277,6 +315,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
backupRepo: buildBackupRepo(),
expectedProcessed: false,
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseFailed).
@@ -286,8 +325,26 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
)
})
type fakeCredsFileStore struct{}
func (f fakeCredsFileStore) Path(selector *corev1.SecretKeySelector) (string, error) {
return "/fake/path", nil
type fakeProvider struct {
}
func (f *fakeProvider) RunBackup(
ctx context.Context,
path string,
tags map[string]string,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, bool, error) {
return "", false, nil
}
func (f *fakeProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
updater uploader.ProgressUpdater) error {
return nil
}
func (f *fakeProvider) Close(ctx context.Context) error {
return nil
}

View File

@@ -39,31 +39,33 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/podvolume"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/repository/util"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
func NewPodVolumeRestoreReconciler(logger logrus.FieldLogger, client client.Client, credentialsFileStore credentials.FileStore) *PodVolumeRestoreReconciler {
func NewPodVolumeRestoreReconciler(logger logrus.FieldLogger, client client.Client, credentialGetter *credentials.CredentialGetter) *PodVolumeRestoreReconciler {
return &PodVolumeRestoreReconciler{
Client: client,
logger: logger.WithField("controller", "PodVolumeRestore"),
credentialsFileStore: credentialsFileStore,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
Client: client,
logger: logger.WithField("controller", "PodVolumeRestore"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
clock: &clock.RealClock{},
}
}
type PodVolumeRestoreReconciler struct {
client.Client
logger logrus.FieldLogger
credentialsFileStore credentials.FileStore
fileSystem filesystem.Interface
clock clock.Clock
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
clock clock.Clock
}
type RestoreProgressUpdater struct {
@@ -240,20 +242,6 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
return errors.Wrap(err, "error identifying path of volume")
}
credsFile, err := c.credentialsFileStore.Path(repokey.RepoKeySelector())
if err != nil {
return errors.Wrap(err, "error creating temp restic credentials file")
}
// ignore error since there's nothing we can do and it's a temp file.
defer os.Remove(credsFile)
resticCmd := restic.RestoreCommand(
req.Spec.RepoIdentifier,
credsFile,
req.Spec.SnapshotID,
volumePath,
)
backupLocation := &velerov1api.BackupStorageLocation{}
if err := c.Get(ctx, client.ObjectKey{
Namespace: req.Namespace,
@@ -262,38 +250,34 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
return errors.Wrap(err, "error getting backup storage location")
}
// if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic
var caCertFile string
if backupLocation.Spec.ObjectStorage != nil && backupLocation.Spec.ObjectStorage.CACert != nil {
caCertFile, err = restic.TempCACertFile(backupLocation.Spec.ObjectStorage.CACert, req.Spec.BackupStorageLocation, c.fileSystem)
if err != nil {
log.WithError(err).Error("Error creating temp cacert file")
}
// ignore error since there's nothing we can do and it's a temp file.
defer os.Remove(caCertFile)
}
resticCmd.CACertFile = caCertFile
env, err := restic.CmdEnv(backupLocation, c.credentialsFileStore)
selector := labels.SelectorFromSet(
map[string]string{
//TODO
//velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(req.Spec.BackupStorageLocation),
//velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
},
)
backupRepo, err := util.GetBackupRepositoryByLabel(ctx, c.Client, req.Namespace, selector)
if err != nil {
return errors.Wrap(err, "error setting restic cmd env")
}
resticCmd.Env = env
// #4820: restrieve insecureSkipTLSVerify from BSL configuration for
// AWS plugin. If nothing is return, that means insecureSkipTLSVerify
// is not enable for Restic command.
skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(backupLocation, log)
if len(skipTLSRet) > 0 {
resticCmd.ExtraFlags = append(resticCmd.ExtraFlags, skipTLSRet)
return errors.Wrap(err, "error getting backup repository")
}
var stdout, stderr string
if stdout, stderr, err = restic.RunRestore(resticCmd, log, c.updateRestoreProgressFunc(req, log)); err != nil {
return errors.Wrapf(err, "error running restic restore, cmd=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
uploaderProv, err := provider.NewUploaderProvider(ctx, c.Client, req.Spec.UploaderType,
req.Spec.RepoIdentifier, backupLocation, &backupRepo, c.credentialGetter, repokey.RepoKeySelector(), log)
if err != nil {
return errors.Wrap(err, "error creating uploader")
}
defer func() {
if err := uploaderProv.Close(ctx); err != nil {
log.Errorf("failed to close uploader provider with error %v", err)
}
}()
if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(req, log, ctx)); err != nil {
return errors.Wrapf(err, "error running restore err=%v", err)
}
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
@@ -327,18 +311,6 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
return nil
}
// updateRestoreProgressFunc returns a func that takes progress info and patches
// the PVR with the new progress
func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
original := req.DeepCopy()
req.Status.Progress = progress
if err := c.Patch(context.Background(), req, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update PodVolumeRestore progress")
}
}
}
func (r *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger, ctx context.Context) *RestoreProgressUpdater {
return &RestoreProgressUpdater{pvr, log, ctx, r.Client}
}

View File

@@ -0,0 +1,45 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
// GetBackupRepositoryByLabel which find backup repository through pvbNamespace, label
// name of BackupRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters
// it could not retrieve the BackupRepository CR with namespace + name. so first list all CRs with in the pvbNamespace
// then filtering the matched CR by label
func GetBackupRepositoryByLabel(ctx context.Context, cli client.Client, pvbNamespace string, selector labels.Selector) (velerov1api.BackupRepository, error) {
backupRepoList := &velerov1api.BackupRepositoryList{}
if err := cli.List(ctx, backupRepoList, &client.ListOptions{
Namespace: pvbNamespace,
LabelSelector: selector,
}); err != nil {
return velerov1api.BackupRepository{}, errors.Wrap(err, "error getting backup repository list")
} else if len(backupRepoList.Items) == 1 {
return backupRepoList.Items[0], nil
} else {
return velerov1api.BackupRepository{}, errors.Errorf("unexpectedly find %d BackupRepository for workload namespace %s with label selector %v", len(backupRepoList.Items), pvbNamespace, selector)
}
}

View File

@@ -26,7 +26,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/exec"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
@@ -71,7 +71,7 @@ func GetSnapshotID(snapshotIdCmd *Command) (string, error) {
// RunBackup runs a `restic backup` command and watches the output to provide
// progress updates to the caller.
func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) {
func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) {
// buffers for copying command stdout/err output into
stdoutBuf := new(bytes.Buffer)
stderrBuf := new(bytes.Buffer)
@@ -104,9 +104,9 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(veler
// if the line contains a non-empty bytes_done field, we can update the
// caller with the progress
if stat.BytesDone != 0 {
updateFunc(velerov1api.PodVolumeOperationProgress{
TotalBytes: stat.TotalBytes,
BytesDone: stat.BytesDone,
updater.UpdateProgress(&uploader.UploaderProgress{
TotalBytes: stat.TotalBytesProcessed,
BytesDone: stat.TotalBytesProcessed,
})
}
}
@@ -136,7 +136,7 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(veler
}
// update progress to 100%
updateFunc(velerov1api.PodVolumeOperationProgress{
updater.UpdateProgress(&uploader.UploaderProgress{
TotalBytes: stat.TotalBytesProcessed,
BytesDone: stat.TotalBytesProcessed,
})
@@ -184,7 +184,7 @@ func getSummaryLine(b []byte) ([]byte, error) {
// RunRestore runs a `restic restore` command and monitors the volume size to
// provide progress updates to the caller.
func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) {
func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) {
insecureTLSFlag := ""
for _, extraFlag := range restoreCmd.ExtraFlags {
@@ -198,7 +198,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(vel
return "", "", errors.Wrap(err, "error getting snapshot size")
}
updateFunc(velerov1api.PodVolumeOperationProgress{
updater.UpdateProgress(&uploader.UploaderProgress{
TotalBytes: snapshotSize,
})
@@ -216,10 +216,12 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(vel
log.WithError(err).Errorf("error getting restic restore progress")
}
updateFunc(velerov1api.PodVolumeOperationProgress{
TotalBytes: snapshotSize,
BytesDone: volumeSize,
})
if volumeSize != 0 {
updater.UpdateProgress(&uploader.UploaderProgress{
TotalBytes: snapshotSize,
BytesDone: volumeSize,
})
}
case <-quit:
ticker.Stop()
return
@@ -231,7 +233,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(vel
quit <- struct{}{}
// update progress to 100%
updateFunc(velerov1api.PodVolumeOperationProgress{
updater.UpdateProgress(&uploader.UploaderProgress{
TotalBytes: snapshotSize,
BytesDone: snapshotSize,
})

View File

@@ -1,37 +0,0 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic
import (
"github.com/sirupsen/logrus"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
// BackupExec is able to run backups.
type BackupExec struct{}
// RunBackup is a wrapper for the restic.RunBackup function in order to be able
// to use interfaces (and swap out objects for testing purposes).
func (exec BackupExec) RunBackup(cmd *Command, log logrus.FieldLogger, updateFn func(velerov1api.PodVolumeOperationProgress)) (string, string, error) {
return RunBackup(cmd, log, updateFn)
}
// GetSnapshotID gets the Restic snapshot ID.
func (exec BackupExec) GetSnapshotID(snapshotIdCmd *Command) (string, error) {
return GetSnapshotID(snapshotIdCmd)
}

View File

@@ -1,37 +0,0 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
"github.com/sirupsen/logrus"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/restic"
)
// FakeResticBackupExec represents an object that can run backups.
type FakeResticBackupExec struct{}
// RunBackup runs a Restic backup.
func (exec FakeResticBackupExec) RunBackup(cmd *restic.Command, log logrus.FieldLogger, updateFn func(velerov1api.PodVolumeOperationProgress)) (string, string, error) {
return "", "", nil
}
// GetSnapshotID gets the Restic snapshot ID.
func (exec FakeResticBackupExec) GetSnapshotID(cmd *restic.Command) (string, error) {
return "", nil
}

View File

@@ -18,6 +18,7 @@ package kopia
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
@@ -83,13 +84,21 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn
//Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, errors.New("get empty kopia uploader")
return nil, false, errors.New("get empty kopia uploader")
}
dir, err := filepath.Abs(sourcePath)
if err != nil {
return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
}
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := ioutil.ReadDir(dir)
if err != nil {
return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir)
} else if len(dirs) == 0 {
return nil, true, nil
}
sourceInfo := snapshot.SourceInfo{
@@ -97,14 +106,13 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
Host: udmrepo.GetRepoDomain(),
Path: filepath.Clean(dir),
}
rootDir, err := getLocalFSEntry(sourceInfo.Path)
if err != nil {
return nil, errors.Wrap(err, "Unable to get local filesystem entry")
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}
snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
if err != nil {
return nil, err
return nil, false, err
}
snapshotInfo := &uploader.SnapshotInfo{
@@ -112,7 +120,7 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
Size: snapshotSize,
}
return snapshotInfo, nil
return snapshotInfo, false, nil
}
func getLocalFSEntry(path0 string) (fs.Entry, error) {

View File

@@ -97,19 +97,20 @@ func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struc
}
}
func (kp *kopiaProvider) Close(ctx context.Context) {
kp.bkRepo.Close(ctx)
func (kp *kopiaProvider) Close(ctx context.Context) error {
return kp.bkRepo.Close(ctx)
}
//RunBackup which will backup specific path and update backup progress
// RunBackup which will backup specific path and update backup progress
// return snapshotID, isEmptySnapshot, error
func (kp *kopiaProvider) RunBackup(
ctx context.Context,
path string,
tags map[string]string,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, error) {
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", errors.New("Need to initial backup progress updater first")
return "", false, errors.New("Need to initial backup progress updater first")
}
log := kp.log.WithFields(logrus.Fields{
@@ -130,13 +131,16 @@ func (kp *kopiaProvider) RunBackup(
close(quit)
}()
snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
if err != nil {
return "", errors.Wrapf(err, "Failed to run kopia backup")
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
} else if isSnapshotEmpty {
log.Debugf("Kopia backup got empty dir with path %s", path)
return "", true, nil
} else if snapshotInfo == nil {
return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
}
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
updater.UpdateProgress(
&uploader.UploaderProgress{
@@ -146,7 +150,7 @@ func (kp *kopiaProvider) RunBackup(
)
log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, nil
return snapshotInfo.ID, false, nil
}
func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {

View File

@@ -25,10 +25,10 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
@@ -37,30 +37,30 @@ import (
func TestRunBackup(t *testing.T) {
var kp kopiaProvider
kp.log = logrus.New()
updater := controller.BackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error)
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
return &uploader.SnapshotInfo{}, nil
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
return &uploader.SnapshotInfo{}, errors.New("failed to backup")
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
return nil, nil
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
},
@@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
BackupFunc = tc.hookBackupFunc
_, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
_, _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
if tc.notError {
assert.NoError(t, err)
} else {
@@ -81,7 +81,7 @@ func TestRunBackup(t *testing.T) {
func TestRunRestore(t *testing.T) {
var kp kopiaProvider
kp.log = logrus.New()
updater := controller.RestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
updater := FakeRestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
testCases := []struct {
name string
@@ -116,3 +116,21 @@ func TestRunRestore(t *testing.T) {
})
}
}
type FakeBackupProgressUpdater struct {
PodVolumeBackup *velerov1api.PodVolumeBackup
Log logrus.FieldLogger
Ctx context.Context
Cli client.Client
}
func (f *FakeBackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {}
type FakeRestoreProgressUpdater struct {
PodVolumeRestore *velerov1api.PodVolumeRestore
Log logrus.FieldLogger
Ctx context.Context
Cli client.Client
}
func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {}

View File

@@ -20,11 +20,16 @@ import (
"context"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
@@ -33,14 +38,14 @@ const backupProgressCheckInterval = 10 * time.Second
// Provider which is designed for one pod volumn to do the backup or restore
type Provider interface {
// RunBackup which will do backup for one specific volumn and return snapshotID error
// RunBackup which will do backup for one specific volumn and return snapshotID, isSnapshotEmpty, error
// updater is used for updating backup progress which implement by third-party
RunBackup(
ctx context.Context,
path string,
tags map[string]string,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, error)
updater uploader.ProgressUpdater) (string, bool, error)
// RunRestore which will do restore for one specific volumn with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party
RunRestore(
@@ -49,23 +54,33 @@ type Provider interface {
volumePath string,
updater uploader.ProgressUpdater) error
// Close which will close related repository
Close(ctx context.Context)
Close(ctx context.Context) error
}
// NewUploaderProvider initialize provider with specific uploaderType
func NewUploaderProvider(
ctx context.Context,
client client.Client,
uploaderType string,
repoIdentifier string,
bsl *velerov1api.BackupStorageLocation,
backupReo *velerov1api.BackupRepository,
backupRepo *velerov1api.BackupRepository,
credGetter *credentials.CredentialGetter,
repoKeySelector *v1.SecretKeySelector,
log logrus.FieldLogger,
) (Provider, error) {
if credGetter.FromFile == nil {
return nil, errors.New("uninitialized FileStore credentail is not supported")
}
if uploaderType == uploader.KopiaType {
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
if err := provider.NewUnifiedRepoProvider(*credGetter, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
return nil, errors.Wrap(err, "failed to connect repository")
}
return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log)
} else {
return NewKopiaUploaderProvider(ctx, credGetter, backupReo, log)
if err := provider.NewResticRepositoryProvider(credGetter.FromFile, nil, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
return nil, errors.Wrap(err, "failed to connect repository")
}
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
}
}

View File

@@ -13,16 +13,40 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"fmt"
"os"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
// mainly used to make testing more convenient
var ResticBackupCMDFunc = restic.BackupCommand
var ResticRestoreCMDFunc = restic.RestoreCommand
type resticProvider struct {
repoIdentifier string
credentialsFile string
caCertFile string
cmdEnv []string
extraFlags []string
bsl *velerov1api.BackupStorageLocation
log logrus.FieldLogger
}
func NewResticUploaderProvider(
repoIdentifier string,
bsl *velerov1api.BackupStorageLocation,
@@ -30,5 +54,120 @@ func NewResticUploaderProvider(
repoKeySelector *v1.SecretKeySelector,
log logrus.FieldLogger,
) (Provider, error) {
return nil, nil //TODO
provider := resticProvider{
repoIdentifier: repoIdentifier,
bsl: bsl,
log: log,
}
var err error
provider.credentialsFile, err = credGetter.FromFile.Path(repoKeySelector)
if err != nil {
return nil, errors.Wrap(err, "error creating temp restic credentials file")
}
// if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic
if bsl.Spec.ObjectStorage != nil && bsl.Spec.ObjectStorage.CACert != nil {
provider.caCertFile, err = restic.TempCACertFile(bsl.Spec.ObjectStorage.CACert, bsl.Name, filesystem.NewFileSystem())
if err != nil {
return nil, errors.Wrap(err, "error create temp cert file")
}
}
provider.cmdEnv, err = restic.CmdEnv(bsl, credGetter.FromFile)
if err != nil {
return nil, errors.Wrap(err, "error generating repository cmnd env")
}
// #4820: restrieve insecureSkipTLSVerify from BSL configuration for
// AWS plugin. If nothing is return, that means insecureSkipTLSVerify
// is not enable for Restic command.
skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(bsl, log)
if len(skipTLSRet) > 0 {
provider.extraFlags = append(provider.extraFlags, skipTLSRet)
}
return &provider, nil
}
func (rp *resticProvider) Close(ctx context.Context) error {
if err := os.Remove(rp.credentialsFile); err != nil {
rp.log.Warnf("Failed to remove file %s with err %v", rp.credentialsFile, err)
}
if err := os.Remove(rp.caCertFile); err != nil {
rp.log.Warnf("Failed to remove file %s with err %v", rp.caCertFile, err)
}
return nil
}
// RunBackup runs a `backup` command and watches the output to provide
// progress updates to the caller and return snapshotID, isEmptySnapshot, error
func (rp *resticProvider) RunBackup(
ctx context.Context,
path string,
tags map[string]string,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
}
log := rp.log.WithFields(logrus.Fields{
"path": path,
"parentSnapshot": parentSnapshot,
})
backupCmd := ResticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
backupCmd.Env = rp.cmdEnv
backupCmd.CACertFile = rp.caCertFile
backupCmd.ExtraFlags = rp.extraFlags
if parentSnapshot != "" {
backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshot))
}
summary, stderrBuf, err := restic.RunBackup(backupCmd, log, updater)
if err != nil {
if strings.Contains(err.Error(), "snapshot is empty") {
log.Debugf("Restic backup got empty dir with %s path", path)
return "", true, nil
}
return "", false, errors.WithStack(fmt.Errorf("error running restic backup with error: %v", err))
}
// GetSnapshotID
snapshotIdCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, tags)
snapshotIdCmd.Env = rp.cmdEnv
snapshotIdCmd.CACertFile = rp.caCertFile
snapshotID, err := restic.GetSnapshotID(snapshotIdCmd)
if err != nil {
return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
}
log.Debugf("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf)
return snapshotID, false, nil
}
// RunRestore runs a `restore` command and monitors the volume size to
// provide progress updates to the caller.
func (rp *resticProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
updater uploader.ProgressUpdater) error {
if updater == nil {
return errors.New("Need to initial backup progress updater first")
}
log := rp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
})
restoreCmd := ResticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
restoreCmd.Env = rp.cmdEnv
restoreCmd.CACertFile = rp.caCertFile
restoreCmd.ExtraFlags = rp.extraFlags
stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater)
log.Debugf("Run command=%s, stdout=%s, stderr=%s", restoreCmd.Command, stdout, stderr)
return err
}

View File

@@ -0,0 +1,106 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"strings"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
func TestResticRunBackup(t *testing.T) {
var rp resticProvider
rp.log = logrus.New()
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
testCases := []struct {
name string
hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command
hookRunBackupFunc func(backupCmd *restic.Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error)
errorHandleFunc func(err error) bool
}{
{
name: "wrong restic execute command",
hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command {
return &restic.Command{Command: "date"}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "executable file not found in")
},
},
{
name: "wrong parsing json summary content",
hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command {
return &restic.Command{Command: "version"}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "executable file not found in")
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ResticBackupCMDFunc = tc.hookBackupFunc
_, _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater)
rp.log.Infof("test name %v error %v", tc.name, err)
require.Equal(t, true, tc.errorHandleFunc(err))
})
}
}
func TestResticRunRestore(t *testing.T) {
var rp resticProvider
rp.log = logrus.New()
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
ResticRestoreCMDFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
return &restic.Command{Args: []string{""}}
}
testCases := []struct {
name string
hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command
errorHandleFunc func(err error) bool
}{
{
name: "wrong restic execute command",
hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
return &restic.Command{Args: []string{"date"}}
},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "executable file not found ")
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ResticRestoreCMDFunc = tc.hookResticRestoreFunc
err := rp.RunRestore(context.Background(), "", "var", &updater)
rp.log.Infof("test name %v error %v", tc.name, err)
require.Equal(t, true, tc.errorHandleFunc(err))
})
}
}