mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-06 21:36:30 +00:00
@@ -1,59 +0,0 @@
|
||||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package builder
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
// BackupRepositoryBuilder builds BackupRepository objects.
|
||||
type BackupRepositoryBuilder struct {
|
||||
object *velerov1api.BackupRepository
|
||||
}
|
||||
|
||||
// ForBackupRepository is the constructor for a BackupRepositoryBuilder.
|
||||
func ForBackupRepository(ns, name string) *BackupRepositoryBuilder {
|
||||
return &BackupRepositoryBuilder{
|
||||
object: &velerov1api.BackupRepository{
|
||||
Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: velerov1api.SchemeGroupVersion.String(),
|
||||
Kind: "BackupRepository",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ns,
|
||||
Name: name,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Result returns the built BackupRepository.
|
||||
func (b *BackupRepositoryBuilder) Result() *velerov1api.BackupRepository {
|
||||
return b.object
|
||||
}
|
||||
|
||||
// ObjectMeta applies functional options to the BackupRepository's ObjectMeta.
|
||||
func (b *BackupRepositoryBuilder) ObjectMeta(opts ...ObjectMetaOpt) *BackupRepositoryBuilder {
|
||||
for _, opt := range opts {
|
||||
opt(b.object)
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
@@ -19,7 +19,6 @@ package controller
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -27,6 +26,7 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
@@ -34,8 +34,10 @@ import (
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/util"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
@@ -143,30 +145,17 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
}, backupLocation); err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location")
|
||||
}
|
||||
|
||||
// name of ResticRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters
|
||||
// it could not retrieve the ResticRepository CR with namespace + name. so first list all CRs with in the volumeNamespace
|
||||
// then filtering the matched CR with prefix volumeNamespace-backupLocation-
|
||||
backupRepos := &velerov1api.BackupRepositoryList{}
|
||||
var backupRepo velerov1api.BackupRepository
|
||||
isFoundRepo := false
|
||||
if r.Client.List(ctx, backupRepos, &client.ListOptions{
|
||||
Namespace: pvb.Namespace,
|
||||
}); err != nil {
|
||||
selector := labels.SelectorFromSet(
|
||||
map[string]string{
|
||||
//TODO
|
||||
//velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
|
||||
velerov1api.StorageLocationLabel: label.GetValidName(pvb.Spec.BackupStorageLocation),
|
||||
//velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
|
||||
},
|
||||
)
|
||||
backupRepo, err := util.GetBackupRepositoryByLabel(ctx, r.Client, pvb.Namespace, selector)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error getting backup repository")
|
||||
} else if len(backupRepos.Items) == 0 {
|
||||
return ctrl.Result{}, errors.Errorf("find empty BackupRepository found for workload namespace %s, backup storage location %s", pvb.Namespace, pvb.Spec.BackupStorageLocation)
|
||||
} else {
|
||||
for _, repo := range backupRepos.Items {
|
||||
if strings.HasPrefix(repo.Name, fmt.Sprintf("%s-%s-", pvb.Spec.Pod.Namespace, pvb.Spec.BackupStorageLocation)) {
|
||||
backupRepo = repo
|
||||
isFoundRepo = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isFoundRepo {
|
||||
return ctrl.Result{}, errors.Errorf("could not found match BackupRepository for workload namespace %s, backup storage location %s", pvb.Namespace, pvb.Spec.BackupStorageLocation)
|
||||
}
|
||||
}
|
||||
|
||||
var uploaderProv provider.Provider
|
||||
@@ -177,17 +166,17 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
}
|
||||
|
||||
// If this is a PVC, look for the most recent completed pod volume backup for it and get
|
||||
// its restic snapshot ID to use as the value of the `--parent` flag. Without this,
|
||||
// its snapshot ID to do new backup based on it. Without this,
|
||||
// if the pod using the PVC (and therefore the directory path under /host_pods/) has
|
||||
// changed since the PVC's last backup, restic will not be able to identify a suitable
|
||||
// changed since the PVC's last backup, for backup, it will not be able to identify a suitable
|
||||
// parent snapshot to use, and will have to do a full rescan of the contents of the PVC.
|
||||
var parentSnapshotID string
|
||||
if pvcUID, ok := pvb.Labels[velerov1api.PVCUIDLabel]; ok {
|
||||
parentSnapshotID = r.getParentSnapshot(ctx, log, pvb.Namespace, pvcUID, pvb.Spec.BackupStorageLocation)
|
||||
if parentSnapshotID == "" {
|
||||
log.Info("No parent snapshot found for PVC, not using --parent flag for this backup")
|
||||
log.Info("No parent snapshot found for PVC, not based on parent snapshot for this backup")
|
||||
} else {
|
||||
log.WithField("parentSnapshotID", parentSnapshotID).Info("Setting --parent flag for this backup")
|
||||
log.WithField("parentSnapshotID", parentSnapshotID).Info("Based on parent snapshot for this backup")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,14 +186,9 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
}
|
||||
}()
|
||||
|
||||
var emptySnapshot bool
|
||||
snapshotID, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx))
|
||||
snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx))
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "snapshot is empty") {
|
||||
emptySnapshot = true
|
||||
} else {
|
||||
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running Restic backup, stderr=%v", err), log)
|
||||
}
|
||||
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running backup, stderr=%v", err), log)
|
||||
}
|
||||
|
||||
// Update status to Completed with path & snapshot ID.
|
||||
@@ -224,7 +208,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time)
|
||||
latencySeconds := float64(latencyDuration / time.Second)
|
||||
backupName := fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name)
|
||||
generateOpName := fmt.Sprintf("%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace)
|
||||
generateOpName := fmt.Sprintf("%s-%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace, pvb.Spec.UploaderType)
|
||||
r.Metrics.ObserveResticOpLatency(r.NodeName, req.Name, generateOpName, backupName, latencySeconds)
|
||||
r.Metrics.RegisterResticOpLatencyGauge(r.NodeName, req.Name, generateOpName, backupName, latencySeconds)
|
||||
r.Metrics.RegisterPodVolumeBackupDequeue(r.NodeName)
|
||||
|
||||
@@ -71,8 +71,21 @@ func bslBuilder() *builder.BackupStorageLocationBuilder {
|
||||
ForBackupStorageLocation(velerov1api.DefaultNamespace, "bsl-loc")
|
||||
}
|
||||
|
||||
func backupRepoBuilder() *builder.BackupRepositoryBuilder {
|
||||
return builder.ForBackupRepository(velerov1api.DefaultNamespace, fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace))
|
||||
func buildBackupRepo() *velerov1api.BackupRepository {
|
||||
return &velerov1api.BackupRepository{
|
||||
Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: velerov1api.SchemeGroupVersion.String(),
|
||||
Kind: "BackupRepository",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: velerov1api.DefaultNamespace,
|
||||
Name: fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace),
|
||||
Labels: map[string]string{
|
||||
velerov1api.StorageLocationLabel: "bsl-loc",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
@@ -176,7 +189,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
pvb: pvbBuilder().Phase("").Node("test_node").Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: true,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
|
||||
@@ -190,7 +203,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: true,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
|
||||
@@ -204,7 +217,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
|
||||
@@ -218,7 +231,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
|
||||
@@ -232,7 +245,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseFailed).
|
||||
@@ -246,7 +259,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseFailed).
|
||||
@@ -260,7 +273,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseNew).
|
||||
@@ -274,7 +287,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseInProgress).
|
||||
@@ -288,7 +301,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
|
||||
@@ -302,7 +315,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
|
||||
Result(),
|
||||
pod: podBuilder().Result(),
|
||||
bsl: bslBuilder().Result(),
|
||||
backupRepo: backupRepoBuilder().Result(),
|
||||
backupRepo: buildBackupRepo(),
|
||||
expectedProcessed: false,
|
||||
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
|
||||
Phase(velerov1api.PodVolumeBackupPhaseFailed).
|
||||
@@ -320,8 +333,8 @@ func (f *fakeProvider) RunBackup(
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater uploader.ProgressUpdater) (string, error) {
|
||||
return "", nil
|
||||
updater uploader.ProgressUpdater) (string, bool, error) {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
func (f *fakeProvider) RunRestore(
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -40,9 +39,10 @@ import (
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/podvolume"
|
||||
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/util"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
@@ -250,29 +250,17 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
|
||||
return errors.Wrap(err, "error getting backup storage location")
|
||||
}
|
||||
|
||||
// name of ResticRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters
|
||||
// it could not retrieve the ResticRepository CR with namespace + name. so first list all CRs with in the volumeNamespace
|
||||
// then filtering the matched CR with prefix volumeNamespace-backupLocation-
|
||||
backupRepos := &velerov1api.BackupRepositoryList{}
|
||||
var backupRepo velerov1api.BackupRepository
|
||||
isFoundRepo := false
|
||||
if c.List(ctx, backupRepos, &client.ListOptions{
|
||||
Namespace: req.Namespace,
|
||||
}); err != nil {
|
||||
selector := labels.SelectorFromSet(
|
||||
map[string]string{
|
||||
//TODO
|
||||
//velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
|
||||
velerov1api.StorageLocationLabel: label.GetValidName(req.Spec.BackupStorageLocation),
|
||||
//velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
|
||||
},
|
||||
)
|
||||
backupRepo, err := util.GetBackupRepositoryByLabel(ctx, c.Client, req.Namespace, selector)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting backup repository")
|
||||
} else if len(backupRepos.Items) == 0 {
|
||||
return errors.Errorf("find empty BackupRepository found for workload namespace %s, backup storage location %s", req.Namespace, req.Spec.BackupStorageLocation)
|
||||
} else {
|
||||
for _, repo := range backupRepos.Items {
|
||||
if strings.HasPrefix(repo.Name, fmt.Sprintf("%s-%s-", req.Spec.Pod.Namespace, req.Spec.BackupStorageLocation)) {
|
||||
backupRepo = repo
|
||||
isFoundRepo = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isFoundRepo {
|
||||
return errors.Errorf("could not found match BackupRepository for workload namespace %s, backup storage location %s", req.Namespace, req.Spec.BackupStorageLocation)
|
||||
}
|
||||
}
|
||||
|
||||
uploaderProv, err := provider.NewUploaderProvider(ctx, c.Client, req.Spec.UploaderType,
|
||||
@@ -288,7 +276,7 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
|
||||
}()
|
||||
|
||||
if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(req, log, ctx)); err != nil {
|
||||
return errors.Wrapf(err, "error running restic restore err=%v", err)
|
||||
return errors.Wrapf(err, "error running restore err=%v", err)
|
||||
}
|
||||
|
||||
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
|
||||
|
||||
45
pkg/repository/util/backup_repo_op.go
Normal file
45
pkg/repository/util/backup_repo_op.go
Normal file
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
// GetBackupRepositoryByLabel which find backup repository through pvbNamespace, label
|
||||
// name of BackupRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters
|
||||
// it could not retrieve the BackupRepository CR with namespace + name. so first list all CRs with in the pvbNamespace
|
||||
// then filtering the matched CR by label
|
||||
func GetBackupRepositoryByLabel(ctx context.Context, cli client.Client, pvbNamespace string, selector labels.Selector) (velerov1api.BackupRepository, error) {
|
||||
backupRepoList := &velerov1api.BackupRepositoryList{}
|
||||
if err := cli.List(ctx, backupRepoList, &client.ListOptions{
|
||||
Namespace: pvbNamespace,
|
||||
LabelSelector: selector,
|
||||
}); err != nil {
|
||||
return velerov1api.BackupRepository{}, errors.Wrap(err, "error getting backup repository list")
|
||||
} else if len(backupRepoList.Items) == 1 {
|
||||
return backupRepoList.Items[0], nil
|
||||
} else {
|
||||
return velerov1api.BackupRepository{}, errors.Errorf("unexpectedly find %d BackupRepository for workload namespace %s with label selector %v", len(backupRepoList.Items), pvbNamespace, selector)
|
||||
}
|
||||
}
|
||||
@@ -20,10 +20,13 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/exec"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
)
|
||||
@@ -66,7 +69,82 @@ func GetSnapshotID(snapshotIdCmd *Command) (string, error) {
|
||||
return snapshots[0].ShortID, nil
|
||||
}
|
||||
|
||||
func DecodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) {
|
||||
// RunBackup runs a `restic backup` command and watches the output to provide
|
||||
// progress updates to the caller.
|
||||
func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) {
|
||||
// buffers for copying command stdout/err output into
|
||||
stdoutBuf := new(bytes.Buffer)
|
||||
stderrBuf := new(bytes.Buffer)
|
||||
|
||||
// create a channel to signal when to end the goroutine scanning for progress
|
||||
// updates
|
||||
quit := make(chan struct{})
|
||||
|
||||
cmd := backupCmd.Cmd()
|
||||
cmd.Stdout = stdoutBuf
|
||||
cmd.Stderr = stderrBuf
|
||||
|
||||
err := cmd.Start()
|
||||
if err != nil {
|
||||
return stdoutBuf.String(), stderrBuf.String(), err
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(backupProgressCheckInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
lastLine := getLastLine(stdoutBuf.Bytes())
|
||||
if len(lastLine) > 0 {
|
||||
stat, err := decodeBackupStatusLine(lastLine)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("error getting restic backup progress")
|
||||
}
|
||||
|
||||
// if the line contains a non-empty bytes_done field, we can update the
|
||||
// caller with the progress
|
||||
if stat.BytesDone != 0 {
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: stat.TotalBytesProcessed,
|
||||
BytesDone: stat.TotalBytesProcessed,
|
||||
})
|
||||
}
|
||||
}
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
return stdoutBuf.String(), stderrBuf.String(), err
|
||||
}
|
||||
quit <- struct{}{}
|
||||
|
||||
summary, err := getSummaryLine(stdoutBuf.Bytes())
|
||||
if err != nil {
|
||||
return stdoutBuf.String(), stderrBuf.String(), err
|
||||
}
|
||||
stat, err := decodeBackupStatusLine(summary)
|
||||
if err != nil {
|
||||
return stdoutBuf.String(), stderrBuf.String(), err
|
||||
}
|
||||
if stat.MessageType != "summary" {
|
||||
return stdoutBuf.String(), stderrBuf.String(), errors.WithStack(fmt.Errorf("error getting restic backup summary: %s", string(summary)))
|
||||
}
|
||||
|
||||
// update progress to 100%
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: stat.TotalBytesProcessed,
|
||||
BytesDone: stat.TotalBytesProcessed,
|
||||
})
|
||||
|
||||
return string(summary), stderrBuf.String(), nil
|
||||
}
|
||||
|
||||
func decodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) {
|
||||
var stat backupStatusLine
|
||||
if err := json.Unmarshal(lastLine, &stat); err != nil {
|
||||
return stat, errors.Wrapf(err, "unable to decode backup JSON line: %s", string(lastLine))
|
||||
@@ -74,10 +152,10 @@ func DecodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) {
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
// GetLastLine returns the last line of a byte array. The string is assumed to
|
||||
// getLastLine returns the last line of a byte array. The string is assumed to
|
||||
// have a newline at the end of it, so this returns the substring between the
|
||||
// last two newlines.
|
||||
func GetLastLine(b []byte) []byte {
|
||||
func getLastLine(b []byte) []byte {
|
||||
if b == nil || len(b) == 0 {
|
||||
return []byte("")
|
||||
}
|
||||
@@ -86,12 +164,12 @@ func GetLastLine(b []byte) []byte {
|
||||
return b[lastNewLineIdx+1 : len(b)-1]
|
||||
}
|
||||
|
||||
// GetSummaryLine looks for the summary JSON line
|
||||
// getSummaryLine looks for the summary JSON line
|
||||
// (`{"message_type:"summary",...`) in the restic backup command output. Due to
|
||||
// an issue in Restic, this might not always be the last line
|
||||
// (https://github.com/restic/restic/issues/2389). It returns an error if it
|
||||
// can't be found.
|
||||
func GetSummaryLine(b []byte) ([]byte, error) {
|
||||
func getSummaryLine(b []byte) ([]byte, error) {
|
||||
summaryLineIdx := bytes.LastIndex(b, []byte(`{"message_type":"summary"`))
|
||||
if summaryLineIdx < 0 {
|
||||
return nil, errors.New("unable to find summary in restic backup command output")
|
||||
@@ -104,7 +182,66 @@ func GetSummaryLine(b []byte) ([]byte, error) {
|
||||
return b[summaryLineIdx : summaryLineIdx+newLineIdx], nil
|
||||
}
|
||||
|
||||
func GetSnapshotSize(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) {
|
||||
// RunRestore runs a `restic restore` command and monitors the volume size to
|
||||
// provide progress updates to the caller.
|
||||
func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error) {
|
||||
insecureTLSFlag := ""
|
||||
|
||||
for _, extraFlag := range restoreCmd.ExtraFlags {
|
||||
if strings.Contains(extraFlag, resticInsecureTLSFlag) {
|
||||
insecureTLSFlag = extraFlag
|
||||
}
|
||||
}
|
||||
|
||||
snapshotSize, err := getSnapshotSize(restoreCmd.RepoIdentifier, restoreCmd.PasswordFile, restoreCmd.CACertFile, restoreCmd.Args[0], restoreCmd.Env, insecureTLSFlag)
|
||||
if err != nil {
|
||||
return "", "", errors.Wrap(err, "error getting snapshot size")
|
||||
}
|
||||
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotSize,
|
||||
})
|
||||
|
||||
// create a channel to signal when to end the goroutine scanning for progress
|
||||
// updates
|
||||
quit := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(restoreProgressCheckInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
volumeSize, err := getVolumeSize(restoreCmd.Dir)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("error getting restic restore progress")
|
||||
}
|
||||
|
||||
if volumeSize != 0 {
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotSize,
|
||||
BytesDone: volumeSize,
|
||||
})
|
||||
}
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
stdout, stderr, err := exec.RunCommand(restoreCmd.Cmd())
|
||||
quit <- struct{}{}
|
||||
|
||||
// update progress to 100%
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotSize,
|
||||
BytesDone: snapshotSize,
|
||||
})
|
||||
|
||||
return stdout, stderr, err
|
||||
}
|
||||
|
||||
func getSnapshotSize(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) {
|
||||
cmd := StatsCommand(repoIdentifier, passwordFile, snapshotID)
|
||||
cmd.Env = env
|
||||
cmd.CACertFile = caCertFile
|
||||
@@ -129,7 +266,7 @@ func GetSnapshotSize(repoIdentifier, passwordFile, caCertFile, snapshotID string
|
||||
return snapshotStats.TotalSize, nil
|
||||
}
|
||||
|
||||
func GetVolumeSize(path string) (int64, error) {
|
||||
func getVolumeSize(path string) (int64, error) {
|
||||
var size int64
|
||||
|
||||
files, err := fileSystem.ReadDir(path)
|
||||
@@ -139,7 +276,7 @@ func GetVolumeSize(path string) (int64, error) {
|
||||
|
||||
for _, file := range files {
|
||||
if file.IsDir() {
|
||||
s, err := GetVolumeSize(fmt.Sprintf("%s/%s", path, file.Name()))
|
||||
s, err := getVolumeSize(fmt.Sprintf("%s/%s", path, file.Name()))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func Test_getSummaryLine(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
summary, err := GetSummaryLine([]byte(tt.output))
|
||||
summary, err := getSummaryLine([]byte(tt.output))
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
@@ -79,7 +79,7 @@ third line
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.want, func(t *testing.T) {
|
||||
assert.Equal(t, []byte(tt.want), GetLastLine([]byte(tt.output)))
|
||||
assert.Equal(t, []byte(tt.want), getLastLine([]byte(tt.output)))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -103,7 +103,7 @@ func Test_getVolumeSize(t *testing.T) {
|
||||
fileSystem = fakefs
|
||||
defer func() { fileSystem = filesystem.NewFileSystem() }()
|
||||
|
||||
actualSize, err := GetVolumeSize("/")
|
||||
actualSize, err := getVolumeSize("/")
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, expectedSize, actualSize)
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
)
|
||||
|
||||
// FakeResticBackupExec represents an object that can run backups.
|
||||
type FakeResticBackupExec struct{}
|
||||
|
||||
// RunBackup runs a Restic backup.
|
||||
func (exec FakeResticBackupExec) RunBackup(cmd *restic.Command, log logrus.FieldLogger, updateFn func(velerov1api.PodVolumeOperationProgress)) (string, string, error) {
|
||||
return "", "", nil
|
||||
}
|
||||
|
||||
// GetSnapshotID gets the Restic snapshot ID.
|
||||
func (exec FakeResticBackupExec) GetSnapshotID(cmd *restic.Command) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
@@ -18,6 +18,7 @@ package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -83,13 +84,21 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn
|
||||
|
||||
//Backup backup specific sourcePath and update progress
|
||||
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
|
||||
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
|
||||
if fsUploader == nil {
|
||||
return nil, errors.New("get empty kopia uploader")
|
||||
return nil, false, errors.New("get empty kopia uploader")
|
||||
}
|
||||
dir, err := filepath.Abs(sourcePath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
|
||||
return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
|
||||
}
|
||||
|
||||
// to be consistent with restic when backup empty dir returns one error for upper logic handle
|
||||
dirs, err := ioutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir)
|
||||
} else if len(dirs) == 0 {
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
sourceInfo := snapshot.SourceInfo{
|
||||
@@ -97,14 +106,13 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
|
||||
Host: udmrepo.GetRepoDomain(),
|
||||
Path: filepath.Clean(dir),
|
||||
}
|
||||
|
||||
rootDir, err := getLocalFSEntry(sourceInfo.Path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Unable to get local filesystem entry")
|
||||
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
|
||||
}
|
||||
snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
snapshotInfo := &uploader.SnapshotInfo{
|
||||
@@ -112,7 +120,7 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
|
||||
Size: snapshotSize,
|
||||
}
|
||||
|
||||
return snapshotInfo, nil
|
||||
return snapshotInfo, false, nil
|
||||
}
|
||||
|
||||
func getLocalFSEntry(path0 string) (fs.Entry, error) {
|
||||
|
||||
@@ -101,15 +101,16 @@ func (kp *kopiaProvider) Close(ctx context.Context) error {
|
||||
return kp.bkRepo.Close(ctx)
|
||||
}
|
||||
|
||||
//RunBackup which will backup specific path and update backup progress
|
||||
// RunBackup which will backup specific path and update backup progress
|
||||
// return snapshotID, isEmptySnapshot, error
|
||||
func (kp *kopiaProvider) RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater uploader.ProgressUpdater) (string, error) {
|
||||
updater uploader.ProgressUpdater) (string, bool, error) {
|
||||
if updater == nil {
|
||||
return "", errors.New("Need to initial backup progress updater first")
|
||||
return "", false, errors.New("Need to initial backup progress updater first")
|
||||
}
|
||||
|
||||
log := kp.log.WithFields(logrus.Fields{
|
||||
@@ -130,13 +131,16 @@ func (kp *kopiaProvider) RunBackup(
|
||||
close(quit)
|
||||
}()
|
||||
|
||||
snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
|
||||
|
||||
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "Failed to run kopia backup")
|
||||
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
|
||||
} else if isSnapshotEmpty {
|
||||
log.Debugf("Kopia backup got empty dir with path %s", path)
|
||||
return "", true, nil
|
||||
} else if snapshotInfo == nil {
|
||||
return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
|
||||
return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
|
||||
}
|
||||
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(
|
||||
&uploader.UploaderProgress{
|
||||
@@ -146,7 +150,7 @@ func (kp *kopiaProvider) RunBackup(
|
||||
)
|
||||
|
||||
log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
|
||||
return snapshotInfo.ID, nil
|
||||
return snapshotInfo.ID, false, nil
|
||||
}
|
||||
|
||||
func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
|
||||
|
||||
@@ -40,27 +40,27 @@ func TestRunBackup(t *testing.T) {
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error)
|
||||
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
|
||||
notError bool
|
||||
}{
|
||||
{
|
||||
name: "success to backup",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
return &uploader.SnapshotInfo{}, nil
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
|
||||
return &uploader.SnapshotInfo{}, false, nil
|
||||
},
|
||||
notError: true,
|
||||
},
|
||||
{
|
||||
name: "get error to backup",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
return &uploader.SnapshotInfo{}, errors.New("failed to backup")
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
|
||||
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "got empty snapshot",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
return nil, nil
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
|
||||
return nil, true, errors.New("snapshot is empty")
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
@@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
BackupFunc = tc.hookBackupFunc
|
||||
_, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
|
||||
_, _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
|
||||
@@ -38,14 +38,14 @@ const backupProgressCheckInterval = 10 * time.Second
|
||||
|
||||
// Provider which is designed for one pod volumn to do the backup or restore
|
||||
type Provider interface {
|
||||
// RunBackup which will do backup for one specific volumn and return snapshotID error
|
||||
// RunBackup which will do backup for one specific volumn and return snapshotID, isSnapshotEmpty, error
|
||||
// updater is used for updating backup progress which implement by third-party
|
||||
RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater uploader.ProgressUpdater) (string, error)
|
||||
updater uploader.ProgressUpdater) (string, bool, error)
|
||||
// RunRestore which will do restore for one specific volumn with given snapshot id and return error
|
||||
// updater is used for updating backup progress which implement by third-party
|
||||
RunRestore(
|
||||
@@ -78,8 +78,7 @@ func NewUploaderProvider(
|
||||
}
|
||||
return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log)
|
||||
} else {
|
||||
err := provider.NewResticRepositoryProvider(credGetter.FromFile, nil, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo})
|
||||
if err != nil {
|
||||
if err := provider.NewResticRepositoryProvider(credGetter.FromFile, nil, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to connect repository")
|
||||
}
|
||||
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
|
||||
|
||||
@@ -17,11 +17,10 @@ limitations under the License.
|
||||
package provider
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -31,25 +30,21 @@ import (
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/exec"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
)
|
||||
|
||||
// BackupFunc mainly used to make testing more convenient
|
||||
var ResticBackupFunc = restic.BackupCommand
|
||||
var ResticRunCMDFunc = exec.RunCommand
|
||||
var ResticGetSnapshotSizeFunc = restic.GetSnapshotSize
|
||||
var ResticGetVolumeSizeFunc = restic.GetVolumeSize
|
||||
var ResticRestoreFunc = restic.RestoreCommand
|
||||
// mainly used to make testing more convenient
|
||||
var ResticBackupCMDFunc = restic.BackupCommand
|
||||
var ResticRestoreCMDFunc = restic.RestoreCommand
|
||||
|
||||
type resticProvider struct {
|
||||
repoIdentifier string
|
||||
credentialsFile string
|
||||
caCertFile string
|
||||
repoEnv []string
|
||||
backupTags map[string]string
|
||||
log logrus.FieldLogger
|
||||
cmdEnv []string
|
||||
extraFlags []string
|
||||
bsl *velerov1api.BackupStorageLocation
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func NewResticUploaderProvider(
|
||||
@@ -61,8 +56,8 @@ func NewResticUploaderProvider(
|
||||
) (Provider, error) {
|
||||
provider := resticProvider{
|
||||
repoIdentifier: repoIdentifier,
|
||||
log: log,
|
||||
bsl: bsl,
|
||||
log: log,
|
||||
}
|
||||
|
||||
var err error
|
||||
@@ -79,38 +74,42 @@ func NewResticUploaderProvider(
|
||||
}
|
||||
}
|
||||
|
||||
provider.repoEnv, err = restic.CmdEnv(bsl, credGetter.FromFile)
|
||||
provider.cmdEnv, err = restic.CmdEnv(bsl, credGetter.FromFile)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error generating repository cmnd env")
|
||||
}
|
||||
|
||||
// #4820: restrieve insecureSkipTLSVerify from BSL configuration for
|
||||
// AWS plugin. If nothing is return, that means insecureSkipTLSVerify
|
||||
// is not enable for Restic command.
|
||||
skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(bsl, log)
|
||||
if len(skipTLSRet) > 0 {
|
||||
provider.extraFlags = append(provider.extraFlags, skipTLSRet)
|
||||
}
|
||||
|
||||
return &provider, nil
|
||||
}
|
||||
|
||||
// Not implement yet
|
||||
func (rp *resticProvider) Cancel() {
|
||||
}
|
||||
|
||||
func (rp *resticProvider) Close(ctx context.Context) error {
|
||||
if err := os.Remove(rp.credentialsFile); err != nil {
|
||||
return errors.Wrapf(err, "failed to remove file %s", rp.credentialsFile)
|
||||
rp.log.Warnf("Failed to remove file %s with err %v", rp.credentialsFile, err)
|
||||
}
|
||||
if err := os.Remove(rp.caCertFile); err != nil {
|
||||
return errors.Wrapf(err, "failed to remove file %s", rp.caCertFile)
|
||||
rp.log.Warnf("Failed to remove file %s with err %v", rp.caCertFile, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunBackup runs a `backup` command and watches the output to provide
|
||||
// progress updates to the caller.
|
||||
// progress updates to the caller and return snapshotID, isEmptySnapshot, error
|
||||
func (rp *resticProvider) RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater uploader.ProgressUpdater) (string, error) {
|
||||
updater uploader.ProgressUpdater) (string, bool, error) {
|
||||
if updater == nil {
|
||||
return "", errors.New("Need to initial backup progress updater first")
|
||||
return "", false, errors.New("Need to initial backup progress updater first")
|
||||
}
|
||||
|
||||
log := rp.log.WithFields(logrus.Fields{
|
||||
@@ -118,104 +117,33 @@ func (rp *resticProvider) RunBackup(
|
||||
"parentSnapshot": parentSnapshot,
|
||||
})
|
||||
|
||||
// buffers for copying command stdout/err output into
|
||||
stdoutBuf := new(bytes.Buffer)
|
||||
stderrBuf := new(bytes.Buffer)
|
||||
|
||||
// create a channel to signal when to end the goroutine scanning for progress
|
||||
// updates
|
||||
quit := make(chan struct{})
|
||||
|
||||
rp.backupTags = tags
|
||||
backupCmd := ResticBackupFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
|
||||
backupCmd.Env = rp.repoEnv
|
||||
backupCmd := ResticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
|
||||
backupCmd.Env = rp.cmdEnv
|
||||
backupCmd.CACertFile = rp.caCertFile
|
||||
|
||||
backupCmd.ExtraFlags = rp.extraFlags
|
||||
if parentSnapshot != "" {
|
||||
backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshot))
|
||||
}
|
||||
|
||||
// #4820: restrieve insecureSkipTLSVerify from BSL configuration for
|
||||
// AWS plugin. If nothing is return, that means insecureSkipTLSVerify
|
||||
// is not enable for Restic command.
|
||||
skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(rp.bsl, rp.log)
|
||||
if len(skipTLSRet) > 0 {
|
||||
backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, skipTLSRet)
|
||||
}
|
||||
|
||||
cmd := backupCmd.Cmd()
|
||||
cmd.Stdout = stdoutBuf
|
||||
cmd.Stderr = stderrBuf
|
||||
|
||||
err := cmd.Start()
|
||||
summary, stderrBuf, err := restic.RunBackup(backupCmd, log, updater)
|
||||
if err != nil {
|
||||
log.Errorf("failed to execute %v with stderr %v error %v", cmd, stderrBuf.String(), err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(backupProgressCheckInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
lastLine := restic.GetLastLine(stdoutBuf.Bytes())
|
||||
if len(lastLine) > 0 {
|
||||
stat, err := restic.DecodeBackupStatusLine(lastLine)
|
||||
if err != nil {
|
||||
rp.log.WithError(err).Errorf("error getting backup progress")
|
||||
}
|
||||
|
||||
// if the line contains a non-empty bytes_done field, we can update the
|
||||
// caller with the progress
|
||||
if stat.BytesDone != 0 {
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: stat.TotalBytesProcessed,
|
||||
BytesDone: stat.TotalBytesProcessed,
|
||||
})
|
||||
}
|
||||
}
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
if strings.Contains(err.Error(), "snapshot is empty") {
|
||||
log.Debugf("Restic backup got empty dir with %s path", path)
|
||||
return "", true, nil
|
||||
}
|
||||
}()
|
||||
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
return "", errors.WithStack(fmt.Errorf("failed to wait execute %v with stderr %v error %v", cmd, stderrBuf.String(), err))
|
||||
return "", false, errors.WithStack(fmt.Errorf("error running restic backup with error: %v", err))
|
||||
}
|
||||
quit <- struct{}{}
|
||||
|
||||
summary, err := restic.GetSummaryLine(stdoutBuf.Bytes())
|
||||
if err != nil {
|
||||
return "", errors.WithStack(fmt.Errorf("failed to get summary %v with error %v", stderrBuf.String(), err))
|
||||
}
|
||||
stat, err := restic.DecodeBackupStatusLine(summary)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(fmt.Errorf("failed to decode summary %v with error %v", string(summary), err))
|
||||
}
|
||||
if stat.MessageType != "summary" {
|
||||
return "", errors.WithStack(fmt.Errorf("error getting backup summary: %s", string(summary)))
|
||||
}
|
||||
|
||||
// update progress to 100%
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: stat.TotalBytesProcessed,
|
||||
BytesDone: stat.TotalBytesProcessed,
|
||||
})
|
||||
|
||||
//GetSnapshtotID
|
||||
snapshotIdCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, rp.backupTags)
|
||||
snapshotIdCmd.Env = rp.repoEnv
|
||||
// GetSnapshotID
|
||||
snapshotIdCmd := restic.GetSnapshotCommand(rp.repoIdentifier, rp.credentialsFile, tags)
|
||||
snapshotIdCmd.Env = rp.cmdEnv
|
||||
snapshotIdCmd.CACertFile = rp.caCertFile
|
||||
|
||||
snapshotID, err := restic.GetSnapshotID(snapshotIdCmd)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(fmt.Errorf("error getting snapshot id with error: %s", err))
|
||||
return "", false, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
|
||||
}
|
||||
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", cmd.String(), summary, stderrBuf.String())
|
||||
return snapshotID, nil
|
||||
log.Debugf("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf)
|
||||
return snapshotID, false, nil
|
||||
}
|
||||
|
||||
// RunRestore runs a `restore` command and monitors the volume size to
|
||||
@@ -225,68 +153,21 @@ func (rp *resticProvider) RunRestore(
|
||||
snapshotID string,
|
||||
volumePath string,
|
||||
updater uploader.ProgressUpdater) error {
|
||||
if updater == nil {
|
||||
return errors.New("Need to initial backup progress updater first")
|
||||
}
|
||||
log := rp.log.WithFields(logrus.Fields{
|
||||
"snapshotID": snapshotID,
|
||||
"volumePath": volumePath,
|
||||
})
|
||||
|
||||
restoreCmd := ResticRestoreFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
|
||||
restoreCmd.Env = rp.repoEnv
|
||||
restoreCmd := ResticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
|
||||
restoreCmd.Env = rp.cmdEnv
|
||||
restoreCmd.CACertFile = rp.caCertFile
|
||||
restoreCmd.ExtraFlags = rp.extraFlags
|
||||
|
||||
// #4820: restrieve insecureSkipTLSVerify from BSL configuration for
|
||||
// AWS plugin. If nothing is return, that means insecureSkipTLSVerify
|
||||
// is not enable for Restic command.
|
||||
skipTLSRet := restic.GetInsecureSkipTLSVerifyFromBSL(rp.bsl, log)
|
||||
if len(skipTLSRet) > 0 {
|
||||
restoreCmd.ExtraFlags = append(restoreCmd.ExtraFlags, skipTLSRet)
|
||||
}
|
||||
snapshotSize, err := ResticGetSnapshotSizeFunc(restoreCmd.RepoIdentifier, restoreCmd.PasswordFile, restoreCmd.CACertFile, restoreCmd.Args[0], restoreCmd.Env, skipTLSRet)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting snapshot size")
|
||||
}
|
||||
stdout, stderr, err := restic.RunRestore(restoreCmd, log, updater)
|
||||
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotSize,
|
||||
})
|
||||
|
||||
// create a channel to signal when to end the goroutine scanning for progress
|
||||
// updates
|
||||
quit := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(restoreProgressCheckInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
volumeSize, err := ResticGetVolumeSizeFunc(restoreCmd.Dir)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("error getting volume size for restore dir %v", restoreCmd.Dir)
|
||||
return
|
||||
}
|
||||
if volumeSize != 0 {
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotSize,
|
||||
BytesDone: volumeSize,
|
||||
})
|
||||
}
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
stdout, stderr, err := ResticRunCMDFunc(restoreCmd.Cmd())
|
||||
quit <- struct{}{}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("failed to execute restore command %v with stderr %v", restoreCmd.Cmd().String(), stderr))
|
||||
}
|
||||
// update progress to 100%
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotSize,
|
||||
BytesDone: snapshotSize,
|
||||
})
|
||||
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", restoreCmd.Command, stdout, stderr)
|
||||
log.Debugf("Run command=%s, stdout=%s, stderr=%s", restoreCmd.Command, stdout, stderr)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@@ -29,6 +28,7 @@ import (
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
)
|
||||
|
||||
func TestResticRunBackup(t *testing.T) {
|
||||
@@ -36,9 +36,10 @@ func TestResticRunBackup(t *testing.T) {
|
||||
rp.log = logrus.New()
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command
|
||||
errorHandleFunc func(err error) bool
|
||||
name string
|
||||
hookBackupFunc func(repoIdentifier string, passwordFile string, path string, tags map[string]string) *restic.Command
|
||||
hookRunBackupFunc func(backupCmd *restic.Command, log logrus.FieldLogger, updater uploader.ProgressUpdater) (string, string, error)
|
||||
errorHandleFunc func(err error) bool
|
||||
}{
|
||||
{
|
||||
name: "wrong restic execute command",
|
||||
@@ -62,71 +63,40 @@ func TestResticRunBackup(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ResticBackupFunc = tc.hookBackupFunc
|
||||
_, err := rp.RunBackup(context.Background(), "var", nil, "", &updater)
|
||||
ResticBackupCMDFunc = tc.hookBackupFunc
|
||||
_, _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater)
|
||||
rp.log.Infof("test name %v error %v", tc.name, err)
|
||||
require.Equal(t, true, tc.errorHandleFunc(err))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResticRunRestore(t *testing.T) {
|
||||
var rp resticProvider
|
||||
rp.log = logrus.New()
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: rp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
ResticRestoreFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
|
||||
ResticRestoreCMDFunc = func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
|
||||
return &restic.Command{Args: []string{""}}
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookResticGetVolumeSizeFunc func(path string) (int64, error)
|
||||
hookResticGetSnapshotSizeFunc func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error)
|
||||
hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command
|
||||
errorHandleFunc func(err error) bool
|
||||
name string
|
||||
hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command
|
||||
errorHandleFunc func(err error) bool
|
||||
}{
|
||||
{
|
||||
name: "failed to get snapshot",
|
||||
hookResticGetVolumeSizeFunc: func(path string) (int64, error) { return 100, nil },
|
||||
hookResticGetSnapshotSizeFunc: func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) {
|
||||
return 100, errors.New("failed to get snapshot")
|
||||
},
|
||||
hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
|
||||
return &restic.Command{Args: []string{""}}
|
||||
},
|
||||
errorHandleFunc: func(err error) bool { return strings.Contains(err.Error(), "failed to get snapshot") },
|
||||
},
|
||||
{
|
||||
name: "failed to get volume size",
|
||||
hookResticGetVolumeSizeFunc: func(path string) (int64, error) { return 100, errors.New("failed to get volume size") },
|
||||
hookResticGetSnapshotSizeFunc: func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) {
|
||||
return 100, nil
|
||||
},
|
||||
hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
|
||||
return &restic.Command{Args: []string{""}}
|
||||
},
|
||||
errorHandleFunc: func(err error) bool {
|
||||
return strings.Contains(err.Error(), "failed to execute restore command restic")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "wrong restic execute command",
|
||||
hookResticGetVolumeSizeFunc: func(path string) (int64, error) { return 100, nil },
|
||||
hookResticGetSnapshotSizeFunc: func(repoIdentifier, passwordFile, caCertFile, snapshotID string, env []string, insecureTLS string) (int64, error) {
|
||||
return 100, nil
|
||||
},
|
||||
name: "wrong restic execute command",
|
||||
hookResticRestoreFunc: func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command {
|
||||
return &restic.Command{Args: []string{"date"}}
|
||||
},
|
||||
errorHandleFunc: func(err error) bool {
|
||||
return strings.Contains(err.Error(), "failed to execute restore command restic")
|
||||
return strings.Contains(err.Error(), "executable file not found ")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ResticGetSnapshotSizeFunc = tc.hookResticGetSnapshotSizeFunc
|
||||
ResticGetVolumeSizeFunc = tc.hookResticGetVolumeSizeFunc
|
||||
ResticRestoreFunc = tc.hookResticRestoreFunc
|
||||
ResticRestoreCMDFunc = tc.hookResticRestoreFunc
|
||||
err := rp.RunRestore(context.Background(), "", "var", &updater)
|
||||
rp.log.Infof("test name %v error %v", tc.name, err)
|
||||
require.Equal(t, true, tc.errorHandleFunc(err))
|
||||
|
||||
Reference in New Issue
Block a user