diff --git a/pkg/builder/backup_repository_builer.go b/pkg/builder/backup_repository_builer.go deleted file mode 100644 index a78f3238a..000000000 --- a/pkg/builder/backup_repository_builer.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright the Velero contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package builder - -import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" -) - -// BackupRepositoryBuilder builds BackupRepository objects. -type BackupRepositoryBuilder struct { - object *velerov1api.BackupRepository -} - -// ForBackupRepository is the constructor for a BackupRepositoryBuilder. -func ForBackupRepository(ns, name string) *BackupRepositoryBuilder { - return &BackupRepositoryBuilder{ - object: &velerov1api.BackupRepository{ - Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""}, - TypeMeta: metav1.TypeMeta{ - APIVersion: velerov1api.SchemeGroupVersion.String(), - Kind: "BackupRepository", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - }, - } -} - -// Result returns the built BackupRepository. -func (b *BackupRepositoryBuilder) Result() *velerov1api.BackupRepository { - return b.object -} - -// ObjectMeta applies functional options to the BackupRepository's ObjectMeta. -func (b *BackupRepositoryBuilder) ObjectMeta(opts ...ObjectMetaOpt) *BackupRepositoryBuilder { - for _, opt := range opts { - opt(b.object) - } - - return b -} diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 97b099341..4e07edb89 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -19,7 +19,6 @@ package controller import ( "context" "fmt" - "strings" "time" "github.com/pkg/errors" @@ -27,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" ctrl "sigs.k8s.io/controller-runtime" @@ -34,8 +34,10 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/metrics" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" + "github.com/vmware-tanzu/velero/pkg/repository/util" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" @@ -143,30 +145,17 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ }, backupLocation); err != nil { return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location") } - - // name of ResticRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters - // it could not retrieve the ResticRepository CR with namespace + name. so first list all CRs with in the volumeNamespace - // then filtering the matched CR with prefix volumeNamespace-backupLocation- - backupRepos := &velerov1api.BackupRepositoryList{} - var backupRepo velerov1api.BackupRepository - isFoundRepo := false - if r.Client.List(ctx, backupRepos, &client.ListOptions{ - Namespace: pvb.Namespace, - }); err != nil { + selector := labels.SelectorFromSet( + map[string]string{ + //TODO + //velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace), + velerov1api.StorageLocationLabel: label.GetValidName(pvb.Spec.BackupStorageLocation), + //velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType), + }, + ) + backupRepo, err := util.GetBackupRepositoryByLabel(ctx, r.Client, pvb.Namespace, selector) + if err != nil { return ctrl.Result{}, errors.Wrap(err, "error getting backup repository") - } else if len(backupRepos.Items) == 0 { - return ctrl.Result{}, errors.Errorf("find empty BackupRepository found for workload namespace %s, backup storage location %s", pvb.Namespace, pvb.Spec.BackupStorageLocation) - } else { - for _, repo := range backupRepos.Items { - if strings.HasPrefix(repo.Name, fmt.Sprintf("%s-%s-", pvb.Spec.Pod.Namespace, pvb.Spec.BackupStorageLocation)) { - backupRepo = repo - isFoundRepo = true - break - } - } - if !isFoundRepo { - return ctrl.Result{}, errors.Errorf("could not found match BackupRepository for workload namespace %s, backup storage location %s", pvb.Namespace, pvb.Spec.BackupStorageLocation) - } } var uploaderProv provider.Provider @@ -177,17 +166,17 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } // If this is a PVC, look for the most recent completed pod volume backup for it and get - // its restic snapshot ID to use as the value of the `--parent` flag. Without this, + // its snapshot ID to do new backup based on it. Without this, // if the pod using the PVC (and therefore the directory path under /host_pods/) has - // changed since the PVC's last backup, restic will not be able to identify a suitable + // changed since the PVC's last backup, for backup, it will not be able to identify a suitable // parent snapshot to use, and will have to do a full rescan of the contents of the PVC. var parentSnapshotID string if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok { parentSnapshotID = r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation) if parentSnapshotID == "" { - log.Info("No parent snapshot found for PVC, not using --parent flag for this backup") + log.Info("No parent snapshot found for PVC, not based on parent snapshot for this backup") } else { - log.WithField("parentSnapshotID", parentSnapshotID).Info("Setting --parent flag for this backup") + log.WithField("parentSnapshotID", parentSnapshotID).Info("Based on parent snapshot for this backup") } } @@ -197,14 +186,9 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } }() - var emptySnapshot bool - snapshotID, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx)) + snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx)) if err != nil { - if strings.Contains(err.Error(), "snapshot is empty") { - emptySnapshot = true - } else { - return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%v", err), log) - } + return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running backup, stderr=%v", err), log) } // Update status to Completed with path & snapshot ID. @@ -224,7 +208,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time) latencySeconds := float64(latencyDuration / time.Second) backupName := fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name) - generateOpName := fmt.Sprintf("%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace) + generateOpName := fmt.Sprintf("%s-%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace, pvb.Spec.UploaderType) r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, generateOpName, backupName, latencySeconds) r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, generateOpName, backupName, latencySeconds) r.Metrics.RegisterPodVolumeBackupDequeue(r.NodeName) diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index 794606cb0..03370f538 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -71,8 +71,21 @@ func bslBuilder() *builder.BackupStorageLocationBuilder { ForBackupStorageLocation(velerov1api.DefaultNamespace, "bsl-loc") } -func backupRepoBuilder() *builder.BackupRepositoryBuilder { - return builder.ForBackupRepository(velerov1api.DefaultNamespace, fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace)) +func buildBackupRepo() *velerov1api.BackupRepository { + return &velerov1api.BackupRepository{ + Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""}, + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1api.SchemeGroupVersion.String(), + Kind: "BackupRepository", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace), + Labels: map[string]string{ + velerov1api.StorageLocationLabel: "bsl-loc", + }, + }, + } } var _ = Describe("PodVolumeBackup Reconciler", func() { @@ -176,7 +189,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { pvb: pvbBuilder().Phase("").Node("test_node").Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: true, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -190,7 +203,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: true, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -204,7 +217,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseInProgress). @@ -218,7 +231,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -232,7 +245,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseFailed). @@ -246,7 +259,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseFailed). @@ -260,7 +273,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseNew). @@ -274,7 +287,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseInProgress). @@ -288,7 +301,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseCompleted). @@ -302,7 +315,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), pod: podBuilder().Result(), bsl: bslBuilder().Result(), - backupRepo: backupRepoBuilder().Result(), + backupRepo: buildBackupRepo(), expectedProcessed: false, expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). Phase(velerov1api.PodVolumeBackupPhaseFailed). @@ -320,8 +333,8 @@ func (f *fakeProvider) RunBackup( path string, tags map[string]string, parentSnapshot string, - updater uploader.ProgressUpdater) (string, error) { - return "", nil + updater uploader.ProgressUpdater) (string, bool, error) { + return "", false, nil } func (f *fakeProvider) RunRestore( diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index e3e83fb8e..0302d1e62 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -40,9 +39,10 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/podvolume" repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" - "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/repository/util" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/boolptr" @@ -250,29 +250,17 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve return errors.Wrap(err, "error getting backup storage location") } - // name of ResticRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters - // it could not retrieve the ResticRepository CR with namespace + name. so first list all CRs with in the volumeNamespace - // then filtering the matched CR with prefix volumeNamespace-backupLocation- - backupRepos := &velerov1api.BackupRepositoryList{} - var backupRepo velerov1api.BackupRepository - isFoundRepo := false - if c.List(ctx, backupRepos, &client.ListOptions{ - Namespace: req.Namespace, - }); err != nil { + selector := labels.SelectorFromSet( + map[string]string{ + //TODO + //velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace), + velerov1api.StorageLocationLabel: label.GetValidName(req.Spec.BackupStorageLocation), + //velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType), + }, + ) + backupRepo, err := util.GetBackupRepositoryByLabel(ctx, c.Client, req.Namespace, selector) + if err != nil { return errors.Wrap(err, "error getting backup repository") - } else if len(backupRepos.Items) == 0 { - return errors.Errorf("find empty BackupRepository found for workload namespace %s, backup storage location %s", req.Namespace, req.Spec.BackupStorageLocation) - } else { - for _, repo := range backupRepos.Items { - if strings.HasPrefix(repo.Name, fmt.Sprintf("%s-%s-", req.Spec.Pod.Namespace, req.Spec.BackupStorageLocation)) { - backupRepo = repo - isFoundRepo = true - break - } - } - if !isFoundRepo { - return errors.Errorf("could not found match BackupRepository for workload namespace %s, backup storage location %s", req.Namespace, req.Spec.BackupStorageLocation) - } } uploaderProv, err := provider.NewUploaderProvider(ctx, c.Client, req.Spec.UploaderType, @@ -288,7 +276,7 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve }() if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(req, log, ctx)); err != nil { - return errors.Wrapf(err, "error running restic restore err=%v", err) + return errors.Wrapf(err, "error running restore err=%v", err) } // Remove the .velero directory from the restored volume (it may contain done files from previous restores diff --git a/pkg/repository/util/backup_repo_op.go b/pkg/repository/util/backup_repo_op.go new file mode 100644 index 000000000..0252464f4 --- /dev/null +++ b/pkg/repository/util/backup_repo_op.go @@ -0,0 +1,45 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" +) + +// GetBackupRepositoryByLabel which find backup repository through pvbNamespace, label +// name of BackupRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters +// it could not retrieve the BackupRepository CR with namespace + name. so first list all CRs with in the pvbNamespace +// then filtering the matched CR by label +func GetBackupRepositoryByLabel(ctx context.Context, cli client.Client, pvbNamespace string, selector labels.Selector) (velerov1api.BackupRepository, error) { + backupRepoList := &velerov1api.BackupRepositoryList{} + if err := cli.List(ctx, backupRepoList, &client.ListOptions{ + Namespace: pvbNamespace, + LabelSelector: selector, + }); err != nil { + return velerov1api.BackupRepository{}, errors.Wrap(err, "error getting backup repository list") + } else if len(backupRepoList.Items) == 1 { + return backupRepoList.Items[0], nil + } else { + return velerov1api.BackupRepository{}, errors.Errorf("unexpectedly find %d BackupRepository for workload namespace %s with label selector %v", len(backupRepoList.Items), pvbNamespace, selector) + } +} diff --git a/pkg/restic/exec_commands.go b/pkg/restic/exec_commands.go index 1a4b055bf..22c1a9665 100644 --- a/pkg/restic/exec_commands.go +++ b/pkg/restic/exec_commands.go @@ -20,10 +20,13 @@ import ( "bytes" "encoding/json" "fmt" + "strings" "time" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/exec" "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) @@ -66,7 +69,82 @@ func GetSnapshotID(snapshotIdCmd *Command) (string, error) { return snapshots[0].ShortID, nil } -func DecodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) { +// RunBackup runs a `restic backup` command and watches the output to provide +// progress updates to the caller. +func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) { + // buffers for copying command stdout/err output into + stdoutBuf := new(bytes.Buffer) + stderrBuf := new(bytes.Buffer) + + // create a channel to signal when to end the goroutine scanning for progress + // updates + quit := make(chan struct{}) + + cmd := backupCmd.Cmd() + cmd.Stdout = stdoutBuf + cmd.Stderr = stderrBuf + + err := cmd.Start() + if err != nil { + return stdoutBuf.String(), stderrBuf.String(), err + } + + go func() { + ticker := time.NewTicker(backupProgressCheckInterval) + for { + select { + case <-ticker.C: + lastLine := getLastLine(stdoutBuf.Bytes()) + if len(lastLine) > 0 { + stat, err := decodeBackupStatusLine(lastLine) + if err != nil { + log.WithError(err).Errorf("error getting restic backup progress") + } + + // if the line contains a non-empty bytes_done field, we can update the + // caller with the progress + if stat.BytesDone != 0 { + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: stat.TotalBytesProcessed, + BytesDone: stat.TotalBytesProcessed, + }) + } + } + case <-quit: + ticker.Stop() + return + } + } + }() + + err = cmd.Wait() + if err != nil { + return stdoutBuf.String(), stderrBuf.String(), err + } + quit <- struct{}{} + + summary, err := getSummaryLine(stdoutBuf.Bytes()) + if err != nil { + return stdoutBuf.String(), stderrBuf.String(), err + } + stat, err := decodeBackupStatusLine(summary) + if err != nil { + return stdoutBuf.String(), stderrBuf.String(), err + } + if stat.MessageType != "summary" { + return stdoutBuf.String(), stderrBuf.String(), errors.WithStack(fmt.Errorf("error getting restic backup summary: %s", string(summary))) + } + + // update progress to 100% + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: stat.TotalBytesProcessed, + BytesDone: stat.TotalBytesProcessed, + }) + + return string(summary), stderrBuf.String(), nil +} + +func decodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) { var stat backupStatusLine if err := json.Unmarshal(lastLine, &stat); err != nil { return stat, errors.Wrapf(err, "unable to decode backup JSON line: %s", string(lastLine)) @@ -74,10 +152,10 @@ func DecodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) { return stat, nil } -// GetLastLine returns the last line of a byte array. The string is assumed to +// getLastLine returns the last line of a byte array. The string is assumed to // have a newline at the end of it, so this returns the substring between the // last two newlines. -func GetLastLine(b []byte) []byte { +func getLastLine(b []byte) []byte { if b == nil || len(b) == 0 { return []byte("") } @@ -86,12 +164,12 @@ func GetLastLine(b []byte) []byte { return b[lastNewLineIdx+1 : len(b)-1] } -// GetSummaryLine looks for the summary JSON line +// getSummaryLine looks for the summary JSON line // (`{"message_type:"summary",...`) in the restic backup command output. Due to // an issue in Restic, this might not always be the last line // (https://github.com/restic/restic/issues/2389). It returns an error if it // can't be found. -func GetSummaryLine(b []byte) ([]byte, error) { +func getSummaryLine(b []byte) ([]byte, error) { summaryLineIdx := bytes.LastIndex(b, []byte(`{"message_type":"summary"`)) if summaryLineIdx < 0 { return nil, errors.New("unable to find summary in restic backup command output") @@ -104,7 +182,66 @@ func GetSummaryLine(b []byte) ([]byte, error) { return b[summaryLineIdx : summaryLineIdx+newLineIdx], nil } -func GetSnapshotSize(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) { +// RunRestore runs a `restic restore` command and monitors the volume size to +// provide progress updates to the caller. +func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) { + insecureTLSFlag := "" + + for _, extraFlag := range restoreCmd.ExtraFlags { + if strings.Contains(extraFlag, resticInsecureTLSFlag) { + insecureTLSFlag = extraFlag + } + } + + snapshotSize, err := getSnapshotSize(restoreCmd.RepoIdentifier, restoreCmd.PasswordFile, restoreCmd.CACertFile, restoreCmd.Args[0], restoreCmd.Env, insecureTLSFlag) + if err != nil { + return "", "", errors.Wrap(err, "error getting snapshot size") + } + + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: snapshotSize, + }) + + // create a channel to signal when to end the goroutine scanning for progress + // updates + quit := make(chan struct{}) + + go func() { + ticker := time.NewTicker(restoreProgressCheckInterval) + for { + select { + case <-ticker.C: + volumeSize, err := getVolumeSize(restoreCmd.Dir) + if err != nil { + log.WithError(err).Errorf("error getting restic restore progress") + } + + if volumeSize != 0 { + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: snapshotSize, + BytesDone: volumeSize, + }) + } + case <-quit: + ticker.Stop() + return + } + } + }() + + stdout, stderr, err := exec.RunCommand(restoreCmd.Cmd()) + quit <- struct{}{} + + // update progress to 100% + updater.UpdateProgress(&uploader.UploaderProgress{ + TotalBytes: snapshotSize, + BytesDone: snapshotSize, + }) + + return stdout, stderr, err +} + +func getSnapshotSize(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) { cmd := StatsCommand(repoIdentifier, passwordFile, snapshotID) cmd.Env = env cmd.CACertFile = caCertFile @@ -129,7 +266,7 @@ func GetSnapshotSize(repoIdentifier, passwordFile, caCertFile, snapshotID string return snapshotStats.TotalSize, nil } -func GetVolumeSize(path string) (int64, error) { +func getVolumeSize(path string) (int64, error) { var size int64 files, err := fileSystem.ReadDir(path) @@ -139,7 +276,7 @@ func GetVolumeSize(path string) (int64, error) { for _, file := range files { if file.IsDir() { - s, err := GetVolumeSize(fmt.Sprintf("%s/%s", path, file.Name())) + s, err := getVolumeSize(fmt.Sprintf("%s/%s", path, file.Name())) if err != nil { return 0, err } diff --git a/pkg/restic/exec_commands_test.go b/pkg/restic/exec_commands_test.go index e8ca9076f..0353f3e2c 100644 --- a/pkg/restic/exec_commands_test.go +++ b/pkg/restic/exec_commands_test.go @@ -52,7 +52,7 @@ func Test_getSummaryLine(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - summary, err := GetSummaryLine([]byte(tt.output)) + summary, err := getSummaryLine([]byte(tt.output)) if tt.wantErr { assert.Error(t, err) } else { @@ -79,7 +79,7 @@ third line } for _, tt := range tests { t.Run(tt.want, func(t *testing.T) { - assert.Equal(t, []byte(tt.want), GetLastLine([]byte(tt.output))) + assert.Equal(t, []byte(tt.want), getLastLine([]byte(tt.output))) }) } } @@ -103,7 +103,7 @@ func Test_getVolumeSize(t *testing.T) { fileSystem = fakefs defer func() { fileSystem = filesystem.NewFileSystem() }() - actualSize, err := GetVolumeSize("/") + actualSize, err := getVolumeSize("/") assert.NoError(t, err) assert.Equal(t, expectedSize, actualSize) diff --git a/pkg/restic/mocks/fake_restic_executer.go b/pkg/restic/mocks/fake_restic_executer.go deleted file mode 100644 index 9dcae9574..000000000 --- a/pkg/restic/mocks/fake_restic_executer.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -Copyright The Velero Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mocks - -import ( - "github.com/sirupsen/logrus" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/restic" -) - -// FakeResticBackupExec represents an object that can run backups. -type FakeResticBackupExec struct{} - -// RunBackup runs a Restic backup. -func (exec FakeResticBackupExec) RunBackup(cmd *restic.Command, log logrus.FieldLogger, updateFn func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { - return "", "", nil -} - -// GetSnapshotID gets the Restic snapshot ID. -func (exec FakeResticBackupExec) GetSnapshotID(cmd *restic.Command) (string, error) { - return "", nil -} \ No newline at end of file diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index c3ab2d10e..fc5a71fa7 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -18,6 +18,7 @@ package kopia import ( "context" + "io/ioutil" "math" "os" "path/filepath" @@ -83,13 +84,21 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn //Backup backup specific sourcePath and update progress func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, - parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { + parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { if fsUploader == nil { - return nil, errors.New("get empty kopia uploader") + return nil, false, errors.New("get empty kopia uploader") } dir, err := filepath.Abs(sourcePath) if err != nil { - return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) + return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) + } + + // to be consistent with restic when backup empty dir returns one error for upper logic handle + dirs, err := ioutil.ReadDir(dir) + if err != nil { + return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir) + } else if len(dirs) == 0 { + return nil, true, nil } sourceInfo := snapshot.SourceInfo{ @@ -97,14 +106,13 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep Host: udmrepo.GetRepoDomain(), Path: filepath.Clean(dir), } - rootDir, err := getLocalFSEntry(sourceInfo.Path) if err != nil { - return nil, errors.Wrap(err, "Unable to get local filesystem entry") + return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") } snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader") if err != nil { - return nil, err + return nil, false, err } snapshotInfo := &uploader.SnapshotInfo{ @@ -112,7 +120,7 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep Size: snapshotSize, } - return snapshotInfo, nil + return snapshotInfo, false, nil } func getLocalFSEntry(path0 string) (fs.Entry, error) { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 73798d9ac..9890a8a91 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -101,15 +101,16 @@ func (kp *kopiaProvider) Close(ctx context.Context) error { return kp.bkRepo.Close(ctx) } -//RunBackup which will backup specific path and update backup progress +// RunBackup which will backup specific path and update backup progress +// return snapshotID, isEmptySnapshot, error func (kp *kopiaProvider) RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater uploader.ProgressUpdater) (string, error) { + updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { - return "", errors.New("Need to initial backup progress updater first") + return "", false, errors.New("Need to initial backup progress updater first") } log := kp.log.WithFields(logrus.Fields{ @@ -130,13 +131,16 @@ func (kp *kopiaProvider) RunBackup( close(quit) }() - snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log) - + snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log) if err != nil { - return "", errors.Wrapf(err, "Failed to run kopia backup") + return "", false, errors.Wrapf(err, "Failed to run kopia backup") + } else if isSnapshotEmpty { + log.Debugf("Kopia backup got empty dir with path %s", path) + return "", true, nil } else if snapshotInfo == nil { - return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) + return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) } + // which ensure that the statistic data of TotalBytes equal to BytesDone when finished updater.UpdateProgress( &uploader.UploaderProgress{ @@ -146,7 +150,7 @@ func (kp *kopiaProvider) RunBackup( ) log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size) - return snapshotInfo.ID, nil + return snapshotInfo.ID, false, nil } func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) { diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index ac061d6c7..955bf83f4 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -40,27 +40,27 @@ func TestRunBackup(t *testing.T) { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} testCases := []struct { name string - hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) + hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) notError bool }{ { name: "success to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { - return &uploader.SnapshotInfo{}, nil + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return &uploader.SnapshotInfo{}, false, nil }, notError: true, }, { name: "get error to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { - return &uploader.SnapshotInfo{}, errors.New("failed to backup") + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return &uploader.SnapshotInfo{}, false, errors.New("failed to backup") }, notError: false, }, { name: "got empty snapshot", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { - return nil, nil + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return nil, true, errors.New("snapshot is empty") }, notError: false, }, @@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { BackupFunc = tc.hookBackupFunc - _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater) + _, _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater) if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index cac542798..1a74214f1 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -38,14 +38,14 @@ const backupProgressCheckInterval = 10 * time.Second // Provider which is designed for one pod volumn to do the backup or restore type Provider interface { - // RunBackup which will do backup for one specific volumn and return snapshotID error + // RunBackup which will do backup for one specific volumn and return snapshotID, isSnapshotEmpty, error // updater is used for updating backup progress which implement by third-party RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater uploader.ProgressUpdater) (string, error) + updater uploader.ProgressUpdater) (string, bool, error) // RunRestore which will do restore for one specific volumn with given snapshot id and return error // updater is used for updating backup progress which implement by third-party RunRestore( @@ -78,8 +78,7 @@ func NewUploaderProvider( } return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log) } else { - err := provider.NewResticRepositoryProvider(credGetter.FromFile, nil, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}) - if err != nil { + if err := provider.NewResticRepositoryProvider(credGetter.FromFile, nil, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil { return nil, errors.Wrap(err, "failed to connect repository") } return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go index 1f9e18eba..549d39577 100644 --- a/pkg/uploader/provider/restic.go +++ b/pkg/uploader/provider/restic.go @@ -17,11 +17,10 @@ limitations under the License. package provider import ( - "bytes" "context" "fmt" "os" - "time" + "strings" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -31,25 +30,21 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/restic" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/exec" "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) -// BackupFunc mainly used to make testing more convenient -var ResticBackupFunc = restic.BackupCommand -var ResticRunCMDFunc = exec.RunCommand -var ResticGetSnapshotSizeFunc = restic.GetSnapshotSize -var ResticGetVolumeSizeFunc = restic.GetVolumeSize -var ResticRestoreFunc = restic.RestoreCommand +// mainly used to make testing more convenient +var ResticBackupCMDFunc = restic.BackupCommand +var ResticRestoreCMDFunc = restic.RestoreCommand type resticProvider struct { repoIdentifier string credentialsFile string caCertFile string - repoEnv []string - backupTags map[string]string - log logrus.FieldLogger + cmdEnv []string + extraFlags []string bsl *velerov1api.BackupStorageLocation + log logrus.FieldLogger } func NewResticUploaderProvider( @@ -61,8 +56,8 @@ func NewResticUploaderProvider( ) (Provider, error) { provider := resticProvider{ repoIdentifier: repoIdentifier, - log: log, bsl: bsl, + log: log, } var err error @@ -79,38 +74,42 @@ func NewResticUploaderProvider( } } - provider.repoEnv, err = restic.CmdEnv(bsl, credGetter.FromFile) + provider.cmdEnv, err = restic.CmdEnv(bsl, credGetter.FromFile) if err != nil { return nil, errors.Wrap(err, "error generating repository cmnd env") } + // #4820: restrieve insecureSkipTLSVerify from BSL configuration for + // AWS plugin. If nothing is return, that means insecureSkipTLSVerify + // is not enable for Restic command. + skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(bsl, log) + if len(skipTLSRet) > 0 { + provider.extraFlags = append(provider.extraFlags, skipTLSRet) + } + return &provider, nil } -// Not implement yet -func (rp *resticProvider) Cancel() { -} - func (rp *resticProvider) Close(ctx context.Context) error { if err := os.Remove(rp.credentialsFile); err != nil { - return errors.Wrapf(err, "failed to remove file %s", rp.credentialsFile) + rp.log.Warnf("Failed to remove file %s with err %v", rp.credentialsFile, err) } if err := os.Remove(rp.caCertFile); err != nil { - return errors.Wrapf(err, "failed to remove file %s", rp.caCertFile) + rp.log.Warnf("Failed to remove file %s with err %v", rp.caCertFile, err) } return nil } // RunBackup runs a `backup` command and watches the output to provide -// progress updates to the caller. +// progress updates to the caller and return snapshotID, isEmptySnapshot, error func (rp *resticProvider) RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updater uploader.ProgressUpdater) (string, error) { + updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { - return "", errors.New("Need to initial backup progress updater first") + return "", false, errors.New("Need to initial backup progress updater first") } log := rp.log.WithFields(logrus.Fields{ @@ -118,104 +117,33 @@ func (rp *resticProvider) RunBackup( "parentSnapshot": parentSnapshot, }) - // buffers for copying command stdout/err output into - stdoutBuf := new(bytes.Buffer) - stderrBuf := new(bytes.Buffer) - - // create a channel to signal when to end the goroutine scanning for progress - // updates - quit := make(chan struct{}) - - rp.backupTags = tags - backupCmd := ResticBackupFunc(rp.repoIdentifier, rp.credentialsFile, path, tags) - backupCmd.Env = rp.repoEnv + backupCmd := ResticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags) + backupCmd.Env = rp.cmdEnv backupCmd.CACertFile = rp.caCertFile - + backupCmd.ExtraFlags = rp.extraFlags if parentSnapshot != "" { backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshot)) } - // #4820: restrieve insecureSkipTLSVerify from BSL configuration for - // AWS plugin. If nothing is return, that means insecureSkipTLSVerify - // is not enable for Restic command. - skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(rp.bsl, rp.log) - if len(skipTLSRet) > 0 { - backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, skipTLSRet) - } - - cmd := backupCmd.Cmd() - cmd.Stdout = stdoutBuf - cmd.Stderr = stderrBuf - - err := cmd.Start() + summary, stderrBuf, err := restic.RunBackup(backupCmd, log, updater) if err != nil { - log.Errorf("failed to execute %v with stderr %v error %v", cmd, stderrBuf.String(), err) - return "", err - } - - go func() { - ticker := time.NewTicker(backupProgressCheckInterval) - for { - select { - case <-ticker.C: - lastLine := restic.GetLastLine(stdoutBuf.Bytes()) - if len(lastLine) > 0 { - stat, err := restic.DecodeBackupStatusLine(lastLine) - if err != nil { - rp.log.WithError(err).Errorf("error getting backup progress") - } - - // if the line contains a non-empty bytes_done field, we can update the - // caller with the progress - if stat.BytesDone != 0 { - updater.UpdateProgress(&uploader.UploaderProgress{ - TotalBytes: stat.TotalBytesProcessed, - BytesDone: stat.TotalBytesProcessed, - }) - } - } - case <-quit: - ticker.Stop() - return - } + if strings.Contains(err.Error(), "snapshot is empty") { + log.Debugf("Restic backup got empty dir with %s path", path) + return "", true, nil } - }() - - err = cmd.Wait() - if err != nil { - return "", errors.WithStack(fmt.Errorf("failed to wait execute %v with stderr %v error %v", cmd, stderrBuf.String(), err)) + return "", false, errors.WithStack(fmt.Errorf("error running restic backup with error: %v", err)) } - quit <- struct{}{} - - summary, err := restic.GetSummaryLine(stdoutBuf.Bytes()) - if err != nil { - return "", errors.WithStack(fmt.Errorf("failed to get summary %v with error %v", stderrBuf.String(), err)) - } - stat, err := restic.DecodeBackupStatusLine(summary) - if err != nil { - return "", errors.WithStack(fmt.Errorf("failed to decode summary %v with error %v", string(summary), err)) - } - if stat.MessageType != "summary" { - return "", errors.WithStack(fmt.Errorf("error getting backup summary: %s", string(summary))) - } - - // update progress to 100% - updater.UpdateProgress(&uploader.UploaderProgress{ - TotalBytes: stat.TotalBytesProcessed, - BytesDone: stat.TotalBytesProcessed, - }) - - //GetSnapshtotID - snapshotIdCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, rp.backupTags) - snapshotIdCmd.Env = rp.repoEnv + // GetSnapshotID + snapshotIdCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, tags) + snapshotIdCmd.Env = rp.cmdEnv snapshotIdCmd.CACertFile = rp.caCertFile snapshotID, err := restic.GetSnapshotID(snapshotIdCmd) if err != nil { - return "", errors.WithStack(fmt.Errorf("error getting snapshot id with error: %s", err)) + return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err)) } - log.Debugf("Ran command=%s, stdout=%s, stderr=%s", cmd.String(), summary, stderrBuf.String()) - return snapshotID, nil + log.Debugf("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf) + return snapshotID, false, nil } // RunRestore runs a `restore` command and monitors the volume size to @@ -225,68 +153,21 @@ func (rp *resticProvider) RunRestore( snapshotID string, volumePath string, updater uploader.ProgressUpdater) error { + if updater == nil { + return errors.New("Need to initial backup progress updater first") + } log := rp.log.WithFields(logrus.Fields{ "snapshotID": snapshotID, "volumePath": volumePath, }) - restoreCmd := ResticRestoreFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath) - restoreCmd.Env = rp.repoEnv + restoreCmd := ResticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath) + restoreCmd.Env = rp.cmdEnv restoreCmd.CACertFile = rp.caCertFile + restoreCmd.ExtraFlags = rp.extraFlags - // #4820: restrieve insecureSkipTLSVerify from BSL configuration for - // AWS plugin. If nothing is return, that means insecureSkipTLSVerify - // is not enable for Restic command. - skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(rp.bsl, log) - if len(skipTLSRet) > 0 { - restoreCmd.ExtraFlags = append(restoreCmd.ExtraFlags, skipTLSRet) - } - snapshotSize, err := ResticGetSnapshotSizeFunc(restoreCmd.RepoIdentifier, restoreCmd.PasswordFile, restoreCmd.CACertFile, restoreCmd.Args[0], restoreCmd.Env, skipTLSRet) - if err != nil { - return errors.Wrap(err, "error getting snapshot size") - } + stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater) - updater.UpdateProgress(&uploader.UploaderProgress{ - TotalBytes: snapshotSize, - }) - - // create a channel to signal when to end the goroutine scanning for progress - // updates - quit := make(chan struct{}) - - go func() { - ticker := time.NewTicker(restoreProgressCheckInterval) - for { - select { - case <-ticker.C: - volumeSize, err := ResticGetVolumeSizeFunc(restoreCmd.Dir) - if err != nil { - log.WithError(err).Errorf("error getting volume size for restore dir %v", restoreCmd.Dir) - return - } - if volumeSize != 0 { - updater.UpdateProgress(&uploader.UploaderProgress{ - TotalBytes: snapshotSize, - BytesDone: volumeSize, - }) - } - case <-quit: - ticker.Stop() - return - } - } - }() - - stdout, stderr, err := ResticRunCMDFunc(restoreCmd.Cmd()) - quit <- struct{}{} - if err != nil { - return errors.Wrap(err, fmt.Sprintf("failed to execute restore command %v with stderr %v", restoreCmd.Cmd().String(), stderr)) - } - // update progress to 100% - updater.UpdateProgress(&uploader.UploaderProgress{ - TotalBytes: snapshotSize, - BytesDone: snapshotSize, - }) - log.Debugf("Ran command=%s, stdout=%s, stderr=%s", restoreCmd.Command, stdout, stderr) + log.Debugf("Run command=%s, stdout=%s, stderr=%s", restoreCmd.Command, stdout, stderr) return err } diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go index 22b44c2ac..042602777 100644 --- a/pkg/uploader/provider/restic_test.go +++ b/pkg/uploader/provider/restic_test.go @@ -18,7 +18,6 @@ package provider import ( "context" - "errors" "strings" "testing" @@ -29,6 +28,7 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme" "github.com/vmware-tanzu/velero/pkg/restic" + "github.com/vmware-tanzu/velero/pkg/uploader" ) func TestResticRunBackup(t *testing.T) { @@ -36,9 +36,10 @@ func TestResticRunBackup(t *testing.T) { rp.log = logrus.New() updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} testCases := []struct { - name string - hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command - errorHandleFunc func(err error) bool + name string + hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command + hookRunBackupFunc func(backupCmd *restic.Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) + errorHandleFunc func(err error) bool }{ { name: "wrong restic execute command", @@ -62,71 +63,40 @@ func TestResticRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ResticBackupFunc = tc.hookBackupFunc - _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater) + ResticBackupCMDFunc = tc.hookBackupFunc + _, _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater) rp.log.Infof("test name %v error %v", tc.name, err) require.Equal(t, true, tc.errorHandleFunc(err)) }) } } + func TestResticRunRestore(t *testing.T) { var rp resticProvider rp.log = logrus.New() updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)} - ResticRestoreFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { + ResticRestoreCMDFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { return &restic.Command{Args: []string{""}} } testCases := []struct { - name string - hookResticGetVolumeSizeFunc func(path string) (int64, error) - hookResticGetSnapshotSizeFunc func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) - hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command - errorHandleFunc func(err error) bool + name string + hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command + errorHandleFunc func(err error) bool }{ { - name: "failed to get snapshot", - hookResticGetVolumeSizeFunc: func(path string) (int64, error) { return 100, nil }, - hookResticGetSnapshotSizeFunc: func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) { - return 100, errors.New("failed to get snapshot") - }, - hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { - return &restic.Command{Args: []string{""}} - }, - errorHandleFunc: func(err error) bool { return strings.Contains(err.Error(), "failed to get snapshot") }, - }, - { - name: "failed to get volume size", - hookResticGetVolumeSizeFunc: func(path string) (int64, error) { return 100, errors.New("failed to get volume size") }, - hookResticGetSnapshotSizeFunc: func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) { - return 100, nil - }, - hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { - return &restic.Command{Args: []string{""}} - }, - errorHandleFunc: func(err error) bool { - return strings.Contains(err.Error(), "failed to execute restore command restic") - }, - }, - { - name: "wrong restic execute command", - hookResticGetVolumeSizeFunc: func(path string) (int64, error) { return 100, nil }, - hookResticGetSnapshotSizeFunc: func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) { - return 100, nil - }, + name: "wrong restic execute command", hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command { return &restic.Command{Args: []string{"date"}} }, errorHandleFunc: func(err error) bool { - return strings.Contains(err.Error(), "failed to execute restore command restic") + return strings.Contains(err.Error(), "executable file not found ") }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ResticGetSnapshotSizeFunc = tc.hookResticGetSnapshotSizeFunc - ResticGetVolumeSizeFunc = tc.hookResticGetVolumeSizeFunc - ResticRestoreFunc = tc.hookResticRestoreFunc + ResticRestoreCMDFunc = tc.hookResticRestoreFunc err := rp.RunRestore(context.Background(), "", "var", &updater) rp.log.Infof("test name %v error %v", tc.name, err) require.Equal(t, true, tc.errorHandleFunc(err))