diff --git a/changelogs/unreleased/5214-qiuming-best b/changelogs/unreleased/5214-qiuming-best new file mode 100644 index 000000000..9c2ddd1de --- /dev/null +++ b/changelogs/unreleased/5214-qiuming-best @@ -0,0 +1 @@ +Uploader Implementation: Restic backup and restore diff --git a/pkg/cmd/cli/restic/server.go b/pkg/cmd/cli/restic/server.go index ae593306b..73d7cdec5 100644 --- a/pkg/cmd/cli/restic/server.go +++ b/pkg/cmd/cli/restic/server.go @@ -52,7 +52,6 @@ import ( "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" "github.com/vmware-tanzu/velero/pkg/controller" "github.com/vmware-tanzu/velero/pkg/metrics" - "github.com/vmware-tanzu/velero/pkg/restic" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" ) @@ -197,22 +196,27 @@ func (s *resticServer) run() { s.logger.Fatalf("Failed to create credentials file store: %v", err) } + credSecretStore, err := credentials.NewNamespacedSecretStore(s.mgr.GetClient(), s.namespace) + if err != nil { + s.logger.Fatalf("Failed to create secret file store: %v", err) + } + + credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} pvbReconciler := controller.PodVolumeBackupReconciler{ - Scheme: s.mgr.GetScheme(), - Client: s.mgr.GetClient(), - Clock: clock.RealClock{}, - Metrics: s.metrics, - CredsFileStore: credentialFileStore, - NodeName: s.nodeName, - FileSystem: filesystem.NewFileSystem(), - ResticExec: restic.BackupExec{}, - Log: s.logger, + Scheme: s.mgr.GetScheme(), + Client: s.mgr.GetClient(), + Clock: clock.RealClock{}, + Metrics: s.metrics, + CredentialGetter: credentialGetter, + NodeName: s.nodeName, + FileSystem: filesystem.NewFileSystem(), + Log: s.logger, } if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup) } - if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialFileStore).SetupWithManager(s.mgr); err != nil { + if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialGetter).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 635ef00a7..4e07edb89 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -19,8 +19,6 @@ package controller import ( "context" "fmt" - "os" - "strings" "time" "github.com/pkg/errors" @@ -28,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" ctrl "sigs.k8s.io/controller-runtime" @@ -35,31 +34,29 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/metrics" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" - "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/repository/util" "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) -// BackupExecuter runs backups. -type BackupExecuter interface { - RunBackup(*restic.Command, logrus.FieldLogger, func(velerov1api.PodVolumeOperationProgress)) (string, string, error) - GetSnapshotID(*restic.Command) (string, error) -} +// For unit test to mock function +var NewUploaderProviderFunc = provider.NewUploaderProvider // PodVolumeBackupReconciler reconciles a PodVolumeBackup object type PodVolumeBackupReconciler struct { - Scheme *runtime.Scheme - Client client.Client - Clock clock.Clock - Metrics *metrics.ServerMetrics - CredsFileStore credentials.FileStore - NodeName string - FileSystem filesystem.Interface - ResticExec BackupExecuter - Log logrus.FieldLogger + Scheme *runtime.Scheme + Client client.Client + Clock clock.Clock + Metrics *metrics.ServerMetrics + CredentialGetter *credentials.CredentialGetter + NodeName string + FileSystem filesystem.Interface + Log logrus.FieldLogger } type BackupProgressUpdater struct { @@ -85,7 +82,6 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } return ctrl.Result{}, errors.Wrap(err, "getting PodVolumeBackup") } - if len(pvb.OwnerReferences) == 1 { log = log.WithField( "backup", @@ -128,16 +124,19 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log) } - var resticDetails resticDetails - resticCmd, err := r.buildResticCommand(ctx, log, &pvb, &pod, &resticDetails) + volDir, err := kube.GetVolumeDirectory(ctx, log, &pod, pvb.Spec.Volume, r.Client) if err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, "building Restic command", log) + return r.updateStatusToFailed(ctx, &pvb, err, "getting volume directory name", log) } - defer func() { - os.Remove(resticDetails.credsFile) - os.Remove(resticDetails.caCertFile) - }() + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir) + log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") + + path, err := kube.SinglePathMatch(pathGlob, r.FileSystem, log) + if err != nil { + return r.updateStatusToFailed(ctx, &pvb, err, "identifying unique volume path on host", log) + } + log.WithField("path", path).Debugf("Found path matching glob") backupLocation := &velerov1api.BackupStorageLocation{} if err := r.Client.Get(context.Background(), client.ObjectKey{ @@ -146,48 +145,55 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ }, backupLocation); err != nil { return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location") } - - // #4820: restrieve insecureSkipTLSVerify from BSL configuration for - // AWS plugin. If nothing is return, that means insecureSkipTLSVerify - // is not enable for Restic command. - skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(backupLocation, log) - if len(skipTLSRet) > 0 { - resticCmd.ExtraFlags = append(resticCmd.ExtraFlags, skipTLSRet) - } - - var stdout, stderr string - - var emptySnapshot bool - stdout, stderr, err = r.ResticExec.RunBackup(resticCmd, log, r.updateBackupProgressFunc(&pvb, log)) + selector := labels.SelectorFromSet( + map[string]string{ + //TODO + //velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace), + velerov1api.StorageLocationLabel: label.GetValidName(pvb.Spec.BackupStorageLocation), + //velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType), + }, + ) + backupRepo, err := util.GetBackupRepositoryByLabel(ctx, r.Client, pvb.Namespace, selector) if err != nil { - if strings.Contains(stderr, "snapshot is empty") { - emptySnapshot = true + return ctrl.Result{}, errors.Wrap(err, "error getting backup repository") + } + + var uploaderProv provider.Provider + uploaderProv, err = NewUploaderProviderFunc(ctx, r.Client, pvb.Spec.UploaderType, pvb.Spec.RepoIdentifier, + backupLocation, &backupRepo, r.CredentialGetter, repokey.RepoKeySelector(), log) + if err != nil { + return r.updateStatusToFailed(ctx, &pvb, err, "error creating uploader", log) + } + + // If this is a PVC, look for the most recent completed pod volume backup for it and get + // its snapshot ID to do new backup based on it. Without this, + // if the pod using the PVC (and therefore the directory path under /host_pods/) has + // changed since the PVC's last backup, for backup, it will not be able to identify a suitable + // parent snapshot to use, and will have to do a full rescan of the contents of the PVC. + var parentSnapshotID string + if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok { + parentSnapshotID = r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation) + if parentSnapshotID == "" { + log.Info("No parent snapshot found for PVC, not based on parent snapshot for this backup") } else { - return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%s", stderr), log) + log.WithField("parentSnapshotID", parentSnapshotID).Info("Based on parent snapshot for this backup") } } - log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr) - var snapshotID string - if !emptySnapshot { - cmd := restic.GetSnapshotCommand(pvb.Spec.RepoIdentifier, resticDetails.credsFile, pvb.Spec.Tags) - cmd.Env = resticDetails.envs - cmd.CACertFile = resticDetails.caCertFile - - // #4820: also apply the insecureTLS flag to Restic snapshots command - if len(skipTLSRet) > 0 { - cmd.ExtraFlags = append(cmd.ExtraFlags, skipTLSRet) + defer func() { + if err := uploaderProv.Close(ctx); err != nil { + log.Errorf("failed to close uploader provider with error %v", err) } + }() - snapshotID, err = r.ResticExec.GetSnapshotID(cmd) - if err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, "getting snapshot id", log) - } + snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx)) + if err != nil { + return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running backup, stderr=%v", err), log) } // Update status to Completed with path & snapshot ID. original = pvb.DeepCopy() - pvb.Status.Path = resticDetails.path + pvb.Status.Path = path pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted pvb.Status.SnapshotID = snapshotID pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} @@ -202,8 +208,9 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time) latencySeconds := float64(latencyDuration / time.Second) backupName := fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name) - r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, resticCmd.Command, backupName, latencySeconds) - r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, resticCmd.Command, backupName, latencySeconds) + generateOpName := fmt.Sprintf("%s-%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace, pvb.Spec.UploaderType) + r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, generateOpName, backupName, latencySeconds) + r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, generateOpName, backupName, latencySeconds) r.Metrics.RegisterPodVolumeBackupDequeue(r.NodeName) log.Info("PodVolumeBackup completed") @@ -272,18 +279,6 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l return mostRecentPVB.Status.SnapshotID } -// updateBackupProgressFunc returns a func that takes progress info and patches -// the PVB with the new progress. -func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { - return func(progress velerov1api.PodVolumeOperationProgress) { - original := pvb.DeepCopy() - pvb.Status.Progress = progress - if err := r.Client.Patch(context.Background(), pvb, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error update progress") - } - } -} - func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed @@ -298,81 +293,6 @@ func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pv return ctrl.Result{}, nil } -type resticDetails struct { - credsFile, caCertFile string - envs []string - path string -} - -func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log *logrus.Entry, pvb *velerov1api.PodVolumeBackup, pod *corev1.Pod, details *resticDetails) (*restic.Command, error) { - volDir, err := kube.GetVolumeDirectory(ctx, log, pod, pvb.Spec.Volume, r.Client) - if err != nil { - return nil, errors.Wrap(err, "getting volume directory name") - } - - pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir) - log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") - - path, err := kube.SinglePathMatch(pathGlob, r.FileSystem, log) - if err != nil { - return nil, errors.Wrap(err, "identifying unique volume path on host") - } - log.WithField("path", path).Debugf("Found path matching glob") - - // Temporary credentials. - details.credsFile, err = r.CredsFileStore.Path(repokey.RepoKeySelector()) - if err != nil { - return nil, errors.Wrap(err, "creating temporary Restic credentials file") - } - - cmd := restic.BackupCommand(pvb.Spec.RepoIdentifier, details.credsFile, path, pvb.Spec.Tags) - - backupLocation := &velerov1api.BackupStorageLocation{} - if err := r.Client.Get(context.Background(), client.ObjectKey{ - Namespace: pvb.Namespace, - Name: pvb.Spec.BackupStorageLocation, - }, backupLocation); err != nil { - return nil, errors.Wrap(err, "getting backup storage location") - } - - // If there's a caCert on the ObjectStorage, write it to disk so that it can - // be passed to Restic. - if backupLocation.Spec.ObjectStorage != nil && - backupLocation.Spec.ObjectStorage.CACert != nil { - - details.caCertFile, err = restic.TempCACertFile(backupLocation.Spec.ObjectStorage.CACert, pvb.Spec.BackupStorageLocation, r.FileSystem) - if err != nil { - log.WithError(err).Error("creating temporary caCert file") - } - } - cmd.CACertFile = details.caCertFile - - details.envs, err = restic.CmdEnv(backupLocation, r.CredsFileStore) - if err != nil { - return nil, errors.Wrap(err, "setting Restic command environment") - } - cmd.Env = details.envs - - // If this is a PVC, look for the most recent completed PodVolumeBackup for - // it and get its Restic snapshot ID to use as the value of the `--parent` - // flag. Without this, if the pod using the PVC (and therefore the directory - // path under /host_pods/) has changed since the PVC's last backup, Restic - // will not be able to identify a suitable parent snapshot to use, and will - // have to do a full rescan of the contents of the PVC. - if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok { - parentSnapshotID := r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation) - if parentSnapshotID == "" { - log.Info("No parent snapshot found for PVC, not using --parent flag for this backup") - } else { - log.WithField("parentSnapshotID", parentSnapshotID). - Info("Setting --parent flag for this backup") - cmd.ExtraFlags = append(cmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshotID)) - } - } - - return cmd, nil -} - func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger, ctx context.Context) *BackupProgressUpdater { return &BackupProgressUpdater{pvb, log, ctx, r.Client} } diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index ffc5f662c..03370f538 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -35,11 +36,13 @@ import ( kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" "github.com/vmware-tanzu/velero/pkg/metrics" - "github.com/vmware-tanzu/velero/pkg/restic/mocks" velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/uploader/provider" ) const name = "pvb-1" @@ -68,12 +71,29 @@ func bslBuilder() *builder.BackupStorageLocationBuilder { ForBackupStorageLocation(velerov1api.DefaultNamespace, "bsl-loc") } +func buildBackupRepo() *velerov1api.BackupRepository { + return &velerov1api.BackupRepository{ + Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""}, + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1api.SchemeGroupVersion.String(), + Kind: "BackupRepository", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace), + Labels: map[string]string{ + velerov1api.StorageLocationLabel: "bsl-loc", + }, + }, + } +} + var _ = Describe("PodVolumeBackup Reconciler", func() { type request struct { - pvb *velerov1api.PodVolumeBackup - pod *corev1.Pod - bsl *velerov1api.BackupStorageLocation - + pvb *velerov1api.PodVolumeBackup + pod *corev1.Pod + bsl *velerov1api.BackupStorageLocation + backupRepo *velerov1api.BackupRepository expectedProcessed bool expected *velerov1api.PodVolumeBackup expectedRequeue ctrl.Result @@ -100,31 +120,41 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { err = fakeClient.Create(ctx, test.bsl) Expect(err).To(BeNil()) + err = fakeClient.Create(ctx, test.backupRepo) + Expect(err).To(BeNil()) + fakeFS := velerotest.NewFakeFileSystem() pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", "pvb-1-volume") _, err = fakeFS.Create(pathGlob) Expect(err).To(BeNil()) + credentialFileStore, err := credentials.NewNamespacedFileStore( + fakeClient, + velerov1api.DefaultNamespace, + "/tmp/credentials", + fakeFS, + ) + // Setup reconciler Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) r := PodVolumeBackupReconciler{ - Client: fakeClient, - Clock: clock.NewFakeClock(now), - Metrics: metrics.NewResticServerMetrics(), - CredsFileStore: fakeCredsFileStore{}, - NodeName: "test_node", - FileSystem: fakeFS, - ResticExec: mocks.FakeResticBackupExec{}, - Log: velerotest.NewLogger(), + Client: fakeClient, + Clock: clock.NewFakeClock(now), + Metrics: metrics.NewResticServerMetrics(), + CredentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore}, + NodeName: "test_node", + FileSystem: fakeFS, + Log: velerotest.NewLogger(), + } + NewUploaderProviderFunc = func(ctx context.Context, client kbclient.Client, uploaderType, repoIdentifier string, bsl *velerov1api.BackupStorageLocation, backupRepo *velerov1api.BackupRepository, credGetter *credentials.CredentialGetter, repoKeySelector *corev1.SecretKeySelector, log logrus.FieldLogger) (provider.Provider, error) { + return &fakeProvider{}, nil } - actualResult, err := r.Reconcile(ctx, ctrl.Request{ NamespacedName: types.NamespacedName{ Namespace: velerov1api.DefaultNamespace, Name: test.pvb.Name, }, }) - Expect(actualResult).To(BeEquivalentTo(test.expectedRequeue)) if test.expectedErrMsg == "" { Expect(err).To(BeNil()) @@ -137,7 +167,6 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Name: test.pvb.Name, Namespace: test.pvb.Namespace, }, &pvb) - // Assertions if test.expected == nil { Expect(apierrors.IsNotFound(err)).To(BeTrue()) @@ -160,6 +189,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { pvb: pvbBuilder().Phase("").Node("test_node").Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: true, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -173,6 +203,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: true, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -186,6 +217,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseInProgress). @@ -199,6 +231,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -212,6 +245,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseFailed). @@ -225,6 +259,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseFailed). @@ -238,6 +273,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseNew). @@ -251,6 +287,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseInProgress). @@ -264,6 +301,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -277,6 +315,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseFailed). @@ -286,8 +325,26 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { ) }) -type fakeCredsFileStore struct{} - -func (f fakeCredsFileStore) Path(selector *corev1.SecretKeySelector) (string, error) { - return "/fake/path", nil +type fakeProvider struct { +} + +func (f *fakeProvider) RunBackup( + ctx context.Context, + path string, + tags map[string]string, + parentSnapshot string, + updater uploader.ProgressUpdater) (string, bool, error) { + return "", false, nil +} + +func (f *fakeProvider) RunRestore( + ctx context.Context, + snapshotID string, + volumePath string, + updater uploader.ProgressUpdater) error { + return nil +} + +func (f *fakeProvider) Close(ctx context.Context) error { + return nil } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index c8f0913db..0302d1e62 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -39,31 +39,33 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/podvolume" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" - "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/repository/util" "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/boolptr" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) -func NewPodVolumeRestoreReconciler(logger logrus.FieldLogger, client client.Client, credentialsFileStore credentials.FileStore) *PodVolumeRestoreReconciler { +func NewPodVolumeRestoreReconciler(logger logrus.FieldLogger, client client.Client, credentialGetter *credentials.CredentialGetter) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ - Client: client, - logger: logger.WithField("controller", "PodVolumeRestore"), - credentialsFileStore: credentialsFileStore, - fileSystem: filesystem.NewFileSystem(), - clock: &clock.RealClock{}, + Client: client, + logger: logger.WithField("controller", "PodVolumeRestore"), + credentialGetter: credentialGetter, + fileSystem: filesystem.NewFileSystem(), + clock: &clock.RealClock{}, } } type PodVolumeRestoreReconciler struct { client.Client - logger logrus.FieldLogger - credentialsFileStore credentials.FileStore - fileSystem filesystem.Interface - clock clock.Clock + logger logrus.FieldLogger + credentialGetter *credentials.CredentialGetter + fileSystem filesystem.Interface + clock clock.Clock } type RestoreProgressUpdater struct { @@ -240,20 +242,6 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve return errors.Wrap(err, "error identifying path of volume") } - credsFile, err := c.credentialsFileStore.Path(repokey.RepoKeySelector()) - if err != nil { - return errors.Wrap(err, "error creating temp restic credentials file") - } - // ignore error since there's nothing we can do and it's a temp file. - defer os.Remove(credsFile) - - resticCmd := restic.RestoreCommand( - req.Spec.RepoIdentifier, - credsFile, - req.Spec.SnapshotID, - volumePath, - ) - backupLocation := &velerov1api.BackupStorageLocation{} if err := c.Get(ctx, client.ObjectKey{ Namespace: req.Namespace, @@ -262,38 +250,34 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve return errors.Wrap(err, "error getting backup storage location") } - // if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic - var caCertFile string - if backupLocation.Spec.ObjectStorage != nil && backupLocation.Spec.ObjectStorage.CACert != nil { - caCertFile, err = restic.TempCACertFile(backupLocation.Spec.ObjectStorage.CACert, req.Spec.BackupStorageLocation, c.fileSystem) - if err != nil { - log.WithError(err).Error("Error creating temp cacert file") - } - // ignore error since there's nothing we can do and it's a temp file. - defer os.Remove(caCertFile) - } - resticCmd.CACertFile = caCertFile - - env, err := restic.CmdEnv(backupLocation, c.credentialsFileStore) + selector := labels.SelectorFromSet( + map[string]string{ + //TODO + //velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace), + velerov1api.StorageLocationLabel: label.GetValidName(req.Spec.BackupStorageLocation), + //velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType), + }, + ) + backupRepo, err := util.GetBackupRepositoryByLabel(ctx, c.Client, req.Namespace, selector) if err != nil { - return errors.Wrap(err, "error setting restic cmd env") - } - resticCmd.Env = env - - // #4820: restrieve insecureSkipTLSVerify from BSL configuration for - // AWS plugin. If nothing is return, that means insecureSkipTLSVerify - // is not enable for Restic command. - skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(backupLocation, log) - if len(skipTLSRet) > 0 { - resticCmd.ExtraFlags = append(resticCmd.ExtraFlags, skipTLSRet) + return errors.Wrap(err, "error getting backup repository") } - var stdout, stderr string - - if stdout, stderr, err = restic.RunRestore(resticCmd, log, c.updateRestoreProgressFunc(req, log)); err != nil { - return errors.Wrapf(err, "error running restic restore, cmd=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr) + uploaderProv, err := provider.NewUploaderProvider(ctx, c.Client, req.Spec.UploaderType, + req.Spec.RepoIdentifier, backupLocation, &backupRepo, c.credentialGetter, repokey.RepoKeySelector(), log) + if err != nil { + return errors.Wrap(err, "error creating uploader") + } + + defer func() { + if err := uploaderProv.Close(ctx); err != nil { + log.Errorf("failed to close uploader provider with error %v", err) + } + }() + + if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(req, log, ctx)); err != nil { + return errors.Wrapf(err, "error running restore err=%v", err) } - log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr) // Remove the .velero directory from the restored volume (it may contain done files from previous restores // of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since @@ -327,18 +311,6 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve return nil } -// updateRestoreProgressFunc returns a func that takes progress info and patches -// the PVR with the new progress -func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { - return func(progress velerov1api.PodVolumeOperationProgress) { - original := req.DeepCopy() - req.Status.Progress = progress - if err := c.Patch(context.Background(), req, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update PodVolumeRestore progress") - } - } -} - func (r *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger, ctx context.Context) *RestoreProgressUpdater { return &RestoreProgressUpdater{pvr, log, ctx, r.Client} } diff --git a/pkg/repository/util/backup_repo_op.go b/pkg/repository/util/backup_repo_op.go new file mode 100644 index 000000000..0252464f4 --- /dev/null +++ b/pkg/repository/util/backup_repo_op.go @@ -0,0 +1,45 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" +) + +// GetBackupRepositoryByLabel which find backup repository through pvbNamespace, label +// name of BackupRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters +// it could not retrieve the BackupRepository CR with namespace + name. so first list all CRs with in the pvbNamespace +// then filtering the matched CR by label +func GetBackupRepositoryByLabel(ctx context.Context, cli client.Client, pvbNamespace string, selector labels.Selector) (velerov1api.BackupRepository, error) { + backupRepoList := &velerov1api.BackupRepositoryList{} + if err := cli.List(ctx, backupRepoList, &client.ListOptions{ + Namespace: pvbNamespace, + LabelSelector: selector, + }); err != nil { + return velerov1api.BackupRepository{}, errors.Wrap(err, "error getting backup repository list") + } else if len(backupRepoList.Items) == 1 { + return backupRepoList.Items[0], nil + } else { + return velerov1api.BackupRepository{}, errors.Errorf("unexpectedly find %d BackupRepository for workload namespace %s with label selector %v", len(backupRepoList.Items), pvbNamespace, selector) + } +} diff --git a/pkg/restic/exec_commands.go b/pkg/restic/exec_commands.go index 7dd0057c0..22c1a9665 100644 --- a/pkg/restic/exec_commands.go +++ b/pkg/restic/exec_commands.go @@ -26,7 +26,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/exec" "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) @@ -71,7 +71,7 @@ func GetSnapshotID(snapshotIdCmd *Command) (string, error) { // RunBackup runs a `restic backup` command and watches the output to provide // progress updates to the caller. -func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { +func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) { // buffers for copying command stdout/err output into stdoutBuf := new(bytes.Buffer) stderrBuf := new(bytes.Buffer) @@ -104,9 +104,9 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(veler // if the line contains a non-empty bytes_done field, we can update the // caller with the progress if stat.BytesDone != 0 { - updateFunc(velerov1api.PodVolumeOperationProgress{ - TotalBytes: stat.TotalBytes, - BytesDone: stat.BytesDone, + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: stat.TotalBytesProcessed, + BytesDone: stat.TotalBytesProcessed, }) } } @@ -136,7 +136,7 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(veler } // update progress to 100% - updateFunc(velerov1api.PodVolumeOperationProgress{ + updater.UpdateProgress(&uploader.UploaderProgress{ TotalBytes: stat.TotalBytesProcessed, BytesDone: stat.TotalBytesProcessed, }) @@ -184,7 +184,7 @@ func getSummaryLine(b []byte) ([]byte, error) { // RunRestore runs a `restic restore` command and monitors the volume size to // provide progress updates to the caller. -func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { +func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) { insecureTLSFlag := "" for _, extraFlag := range restoreCmd.ExtraFlags { @@ -198,7 +198,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(vel return "", "", errors.Wrap(err, "error getting snapshot size") } - updateFunc(velerov1api.PodVolumeOperationProgress{ + updater.UpdateProgress(&uploader.UploaderProgress{ TotalBytes: snapshotSize, }) @@ -216,10 +216,12 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(vel log.WithError(err).Errorf("error getting restic restore progress") } - updateFunc(velerov1api.PodVolumeOperationProgress{ - TotalBytes: snapshotSize, - BytesDone: volumeSize, - }) + if volumeSize != 0 { + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: snapshotSize, + BytesDone: volumeSize, + }) + } case <-quit: ticker.Stop() return @@ -231,7 +233,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updateFunc func(vel quit <- struct{}{} // update progress to 100% - updateFunc(velerov1api.PodVolumeOperationProgress{ + updater.UpdateProgress(&uploader.UploaderProgress{ TotalBytes: snapshotSize, BytesDone: snapshotSize, }) diff --git a/pkg/restic/executer.go b/pkg/restic/executer.go deleted file mode 100644 index e89883e76..000000000 --- a/pkg/restic/executer.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -Copyright The Velero Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package restic - -import ( - "github.com/sirupsen/logrus" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" -) - -// BackupExec is able to run backups. -type BackupExec struct{} - -// RunBackup is a wrapper for the restic.RunBackup function in order to be able -// to use interfaces (and swap out objects for testing purposes). -func (exec BackupExec) RunBackup(cmd *Command, log logrus.FieldLogger, updateFn func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { - return RunBackup(cmd, log, updateFn) -} - -// GetSnapshotID gets the Restic snapshot ID. -func (exec BackupExec) GetSnapshotID(snapshotIdCmd *Command) (string, error) { - return GetSnapshotID(snapshotIdCmd) -} diff --git a/pkg/restic/mocks/fake_restic_executer.go b/pkg/restic/mocks/fake_restic_executer.go deleted file mode 100644 index 9dcae9574..000000000 --- a/pkg/restic/mocks/fake_restic_executer.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -Copyright The Velero Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mocks - -import ( - "github.com/sirupsen/logrus" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/restic" -) - -// FakeResticBackupExec represents an object that can run backups. -type FakeResticBackupExec struct{} - -// RunBackup runs a Restic backup. -func (exec FakeResticBackupExec) RunBackup(cmd *restic.Command, log logrus.FieldLogger, updateFn func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { - return "", "", nil -} - -// GetSnapshotID gets the Restic snapshot ID. -func (exec FakeResticBackupExec) GetSnapshotID(cmd *restic.Command) (string, error) { - return "", nil -} \ No newline at end of file diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index c3ab2d10e..fc5a71fa7 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -18,6 +18,7 @@ package kopia import ( "context" + "io/ioutil" "math" "os" "path/filepath" @@ -83,13 +84,21 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn //Backup backup specific sourcePath and update progress func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, - parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { + parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { if fsUploader == nil { - return nil, errors.New("get empty kopia uploader") + return nil, false, errors.New("get empty kopia uploader") } dir, err := filepath.Abs(sourcePath) if err != nil { - return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) + return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) + } + + // to be consistent with restic when backup empty dir returns one error for upper logic handle + dirs, err := ioutil.ReadDir(dir) + if err != nil { + return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir) + } else if len(dirs) == 0 { + return nil, true, nil } sourceInfo := snapshot.SourceInfo{ @@ -97,14 +106,13 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep Host: udmrepo.GetRepoDomain(), Path: filepath.Clean(dir), } - rootDir, err := getLocalFSEntry(sourceInfo.Path) if err != nil { - return nil, errors.Wrap(err, "Unable to get local filesystem entry") + return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") } snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader") if err != nil { - return nil, err + return nil, false, err } snapshotInfo := &uploader.SnapshotInfo{ @@ -112,7 +120,7 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep Size: snapshotSize, } - return snapshotInfo, nil + return snapshotInfo, false, nil } func getLocalFSEntry(path0 string) (fs.Entry, error) { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 6384142ef..9890a8a91 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -97,19 +97,20 @@ func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struc } } -func (kp *kopiaProvider) Close(ctx context.Context) { - kp.bkRepo.Close(ctx) +func (kp *kopiaProvider) Close(ctx context.Context) error { + return kp.bkRepo.Close(ctx) } -//RunBackup which will backup specific path and update backup progress +// RunBackup which will backup specific path and update backup progress +// return snapshotID, isEmptySnapshot, error func (kp *kopiaProvider) RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater uploader.ProgressUpdater) (string, error) { + updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { - return "", errors.New("Need to initial backup progress updater first") + return "", false, errors.New("Need to initial backup progress updater first") } log := kp.log.WithFields(logrus.Fields{ @@ -130,13 +131,16 @@ func (kp *kopiaProvider) RunBackup( close(quit) }() - snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log) - + snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log) if err != nil { - return "", errors.Wrapf(err, "Failed to run kopia backup") + return "", false, errors.Wrapf(err, "Failed to run kopia backup") + } else if isSnapshotEmpty { + log.Debugf("Kopia backup got empty dir with path %s", path) + return "", true, nil } else if snapshotInfo == nil { - return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) + return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) } + // which ensure that the statistic data of TotalBytes equal to BytesDone when finished updater.UpdateProgress( &uploader.UploaderProgress{ @@ -146,7 +150,7 @@ func (kp *kopiaProvider) RunBackup( ) log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size) - return snapshotInfo.ID, nil + return snapshotInfo.ID, false, nil } func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) { diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 747acbfb4..955bf83f4 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -25,10 +25,10 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/controller" "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/kopia" @@ -37,30 +37,30 @@ import ( func TestRunBackup(t *testing.T) { var kp kopiaProvider kp.log = logrus.New() - updater := controller.BackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} + updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} testCases := []struct { name string - hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) + hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) notError bool }{ { name: "success to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { - return &uploader.SnapshotInfo{}, nil + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return &uploader.SnapshotInfo{}, false, nil }, notError: true, }, { name: "get error to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { - return &uploader.SnapshotInfo{}, errors.New("failed to backup") + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return &uploader.SnapshotInfo{}, false, errors.New("failed to backup") }, notError: false, }, { name: "got empty snapshot", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { - return nil, nil + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return nil, true, errors.New("snapshot is empty") }, notError: false, }, @@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { BackupFunc = tc.hookBackupFunc - _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater) + _, _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater) if tc.notError { assert.NoError(t, err) } else { @@ -81,7 +81,7 @@ func TestRunBackup(t *testing.T) { func TestRunRestore(t *testing.T) { var kp kopiaProvider kp.log = logrus.New() - updater := controller.RestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} + updater := FakeRestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} testCases := []struct { name string @@ -116,3 +116,21 @@ func TestRunRestore(t *testing.T) { }) } } + +type FakeBackupProgressUpdater struct { + PodVolumeBackup *velerov1api.PodVolumeBackup + Log logrus.FieldLogger + Ctx context.Context + Cli client.Client +} + +func (f *FakeBackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {} + +type FakeRestoreProgressUpdater struct { + PodVolumeRestore *velerov1api.PodVolumeRestore + Log logrus.FieldLogger + Ctx context.Context + Cli client.Client +} + +func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {} diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index b435708f6..1a74214f1 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -20,11 +20,16 @@ import ( "context" "time" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/uploader" ) @@ -33,14 +38,14 @@ const backupProgressCheckInterval = 10 * time.Second // Provider which is designed for one pod volumn to do the backup or restore type Provider interface { - // RunBackup which will do backup for one specific volumn and return snapshotID error + // RunBackup which will do backup for one specific volumn and return snapshotID, isSnapshotEmpty, error // updater is used for updating backup progress which implement by third-party RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater uploader.ProgressUpdater) (string, error) + updater uploader.ProgressUpdater) (string, bool, error) // RunRestore which will do restore for one specific volumn with given snapshot id and return error // updater is used for updating backup progress which implement by third-party RunRestore( @@ -49,23 +54,33 @@ type Provider interface { volumePath string, updater uploader.ProgressUpdater) error // Close which will close related repository - Close(ctx context.Context) + Close(ctx context.Context) error } // NewUploaderProvider initialize provider with specific uploaderType func NewUploaderProvider( ctx context.Context, + client client.Client, uploaderType string, repoIdentifier string, bsl *velerov1api.BackupStorageLocation, - backupReo *velerov1api.BackupRepository, + backupRepo *velerov1api.BackupRepository, credGetter *credentials.CredentialGetter, repoKeySelector *v1.SecretKeySelector, log logrus.FieldLogger, ) (Provider, error) { + if credGetter.FromFile == nil { + return nil, errors.New("uninitialized FileStore credentail is not supported") + } if uploaderType == uploader.KopiaType { - return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) + if err := provider.NewUnifiedRepoProvider(*credGetter, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil { + return nil, errors.Wrap(err, "failed to connect repository") + } + return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log) } else { - return NewKopiaUploaderProvider(ctx, credGetter, backupReo, log) + if err := provider.NewResticRepositoryProvider(credGetter.FromFile, nil, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil { + return nil, errors.Wrap(err, "failed to connect repository") + } + return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) } } diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go index f908077a3..549d39577 100644 --- a/pkg/uploader/provider/restic.go +++ b/pkg/uploader/provider/restic.go @@ -13,16 +13,40 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package provider import ( + "context" + "fmt" + "os" + "strings" + + "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) +// mainly used to make testing more convenient +var ResticBackupCMDFunc = restic.BackupCommand +var ResticRestoreCMDFunc = restic.RestoreCommand + +type resticProvider struct { + repoIdentifier string + credentialsFile string + caCertFile string + cmdEnv []string + extraFlags []string + bsl *velerov1api.BackupStorageLocation + log logrus.FieldLogger +} + func NewResticUploaderProvider( repoIdentifier string, bsl *velerov1api.BackupStorageLocation, @@ -30,5 +54,120 @@ func NewResticUploaderProvider( repoKeySelector *v1.SecretKeySelector, log logrus.FieldLogger, ) (Provider, error) { - return nil, nil //TODO + provider := resticProvider{ + repoIdentifier: repoIdentifier, + bsl: bsl, + log: log, + } + + var err error + provider.credentialsFile, err = credGetter.FromFile.Path(repoKeySelector) + if err != nil { + return nil, errors.Wrap(err, "error creating temp restic credentials file") + } + + // if there's a caCert on the ObjectStorage, write it to disk so that it can be passed to restic + if bsl.Spec.ObjectStorage != nil && bsl.Spec.ObjectStorage.CACert != nil { + provider.caCertFile, err = restic.TempCACertFile(bsl.Spec.ObjectStorage.CACert, bsl.Name, filesystem.NewFileSystem()) + if err != nil { + return nil, errors.Wrap(err, "error create temp cert file") + } + } + + provider.cmdEnv, err = restic.CmdEnv(bsl, credGetter.FromFile) + if err != nil { + return nil, errors.Wrap(err, "error generating repository cmnd env") + } + + // #4820: restrieve insecureSkipTLSVerify from BSL configuration for + // AWS plugin. If nothing is return, that means insecureSkipTLSVerify + // is not enable for Restic command. + skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(bsl, log) + if len(skipTLSRet) > 0 { + provider.extraFlags = append(provider.extraFlags, skipTLSRet) + } + + return &provider, nil +} + +func (rp *resticProvider) Close(ctx context.Context) error { + if err := os.Remove(rp.credentialsFile); err != nil { + rp.log.Warnf("Failed to remove file %s with err %v", rp.credentialsFile, err) + } + if err := os.Remove(rp.caCertFile); err != nil { + rp.log.Warnf("Failed to remove file %s with err %v", rp.caCertFile, err) + } + return nil +} + +// RunBackup runs a `backup` command and watches the output to provide +// progress updates to the caller and return snapshotID, isEmptySnapshot, error +func (rp *resticProvider) RunBackup( + ctx context.Context, + path string, + tags map[string]string, + parentSnapshot string, + updater uploader.ProgressUpdater) (string, bool, error) { + if updater == nil { + return "", false, errors.New("Need to initial backup progress updater first") + } + + log := rp.log.WithFields(logrus.Fields{ + "path": path, + "parentSnapshot": parentSnapshot, + }) + + backupCmd := ResticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags) + backupCmd.Env = rp.cmdEnv + backupCmd.CACertFile = rp.caCertFile + backupCmd.ExtraFlags = rp.extraFlags + if parentSnapshot != "" { + backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshot)) + } + + summary, stderrBuf, err := restic.RunBackup(backupCmd, log, updater) + if err != nil { + if strings.Contains(err.Error(), "snapshot is empty") { + log.Debugf("Restic backup got empty dir with %s path", path) + return "", true, nil + } + return "", false, errors.WithStack(fmt.Errorf("error running restic backup with error: %v", err)) + } + // GetSnapshotID + snapshotIdCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, tags) + snapshotIdCmd.Env = rp.cmdEnv + snapshotIdCmd.CACertFile = rp.caCertFile + + snapshotID, err := restic.GetSnapshotID(snapshotIdCmd) + if err != nil { + return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err)) + } + log.Debugf("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf) + return snapshotID, false, nil +} + +// RunRestore runs a `restore` command and monitors the volume size to +// provide progress updates to the caller. +func (rp *resticProvider) RunRestore( + ctx context.Context, + snapshotID string, + volumePath string, + updater uploader.ProgressUpdater) error { + if updater == nil { + return errors.New("Need to initial backup progress updater first") + } + log := rp.log.WithFields(logrus.Fields{ + "snapshotID": snapshotID, + "volumePath": volumePath, + }) + + restoreCmd := ResticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath) + restoreCmd.Env = rp.cmdEnv + restoreCmd.CACertFile = rp.caCertFile + restoreCmd.ExtraFlags = rp.extraFlags + + stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater) + + log.Debugf("Run command=%s, stdout=%s, stderr=%s", restoreCmd.Command, stdout, stderr) + return err } diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go new file mode 100644 index 000000000..042602777 --- /dev/null +++ b/pkg/uploader/provider/restic_test.go @@ -0,0 +1,106 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "strings" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme" + "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/uploader" +) + +func TestResticRunBackup(t *testing.T) { + var rp resticProvider + rp.log = logrus.New() + updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} + testCases := []struct { + name string + hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command + hookRunBackupFunc func(backupCmd *restic.Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) + errorHandleFunc func(err error) bool + }{ + { + name: "wrong restic execute command", + hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command { + return &restic.Command{Command: "date"} + }, + errorHandleFunc: func(err error) bool { + return strings.Contains(err.Error(), "executable file not found in") + }, + }, + { + name: "wrong parsing json summary content", + hookBackupFunc: func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command { + return &restic.Command{Command: "version"} + }, + errorHandleFunc: func(err error) bool { + return strings.Contains(err.Error(), "executable file not found in") + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ResticBackupCMDFunc = tc.hookBackupFunc + _, _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater) + rp.log.Infof("test name %v error %v", tc.name, err) + require.Equal(t, true, tc.errorHandleFunc(err)) + }) + } +} + +func TestResticRunRestore(t *testing.T) { + var rp resticProvider + rp.log = logrus.New() + updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} + ResticRestoreCMDFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { + return &restic.Command{Args: []string{""}} + } + testCases := []struct { + name string + hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command + errorHandleFunc func(err error) bool + }{ + { + name: "wrong restic execute command", + hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { + return &restic.Command{Args: []string{"date"}} + }, + errorHandleFunc: func(err error) bool { + return strings.Contains(err.Error(), "executable file not found ") + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ResticRestoreCMDFunc = tc.hookResticRestoreFunc + err := rp.RunRestore(context.Background(), "", "var", &updater) + rp.log.Infof("test name %v error %v", tc.name, err) + require.Equal(t, true, tc.errorHandleFunc(err)) + }) + } + +}