Kopia Pod Volume Backup/Restore (#5259)

* kopia pvbr

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
lyndon
2022-09-05 10:29:30 +08:00
committed by GitHub
parent a5a3df193d
commit 0282e65221
19 changed files with 441 additions and 95 deletions

View File

@@ -0,0 +1 @@
Fill gaps for Kopia path of PVBR: integrate Repo Manager with Unified Repo; pass UploaderType to PVBR backupper and restorer; pass RepositoryType to BackupRepository controller and Repo Ensurer

View File

@@ -52,8 +52,8 @@ const (
BackupRepositoryPhaseReady BackupRepositoryPhase = "Ready"
BackupRepositoryPhaseNotReady BackupRepositoryPhase = "NotReady"
BackupRepositoryTypeRestic string = "restic"
BackupRepositoryTypeUnified string = "unified"
BackupRepositoryTypeRestic string = "restic"
BackupRepositoryTypeKopia string = "kopia"
)
// BackupRepositoryStatus is the current status of a BackupRepository.

View File

@@ -40,16 +40,19 @@ const (
// PodVolumeOperationTimeoutAnnotation is the annotation key used to apply
// a backup/restore-specific timeout value for pod volume operations (i.e.
// restic backups/restores).
// pod volume backups/restores).
PodVolumeOperationTimeoutAnnotation = "velero.io/pod-volume-timeout"
// StorageLocationLabel is the label key used to identify the storage
// location of a backup.
StorageLocationLabel = "velero.io/storage-location"
// ResticVolumeNamespaceLabel is the label key used to identify which
// namespace a restic repository stores pod volume backups for.
ResticVolumeNamespaceLabel = "velero.io/volume-namespace"
// VolumeNamespaceLabel is the label key used to identify which
// namespace a repository stores backups for.
VolumeNamespaceLabel = "velero.io/volume-namespace"
// RepositoryTypeLabel is the label key used to identify the type of a repository
RepositoryTypeLabel = "velero.io/repository-type"
// SourceClusterK8sVersionAnnotation is the label key used to identify the k8s
// git version of the backup , i.e. v1.16.4

View File

@@ -79,6 +79,7 @@ type kubernetesBackupper struct {
resticTimeout time.Duration
defaultVolumesToRestic bool
clientPageSize int
uploaderType string
}
func (i *itemKey) String() string {
@@ -105,6 +106,7 @@ func NewKubernetesBackupper(
resticTimeout time.Duration,
defaultVolumesToRestic bool,
clientPageSize int,
uploaderType string,
) (Backupper, error) {
return &kubernetesBackupper{
backupClient: backupClient,
@@ -115,6 +117,7 @@ func NewKubernetesBackupper(
resticTimeout: resticTimeout,
defaultVolumesToRestic: defaultVolumesToRestic,
clientPageSize: clientPageSize,
uploaderType: uploaderType,
}, nil
}
@@ -240,7 +243,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
var resticBackupper podvolume.Backupper
if kb.resticBackupperFactory != nil {
resticBackupper, err = kb.resticBackupperFactory.NewBackupper(ctx, backupRequest.Backup)
resticBackupper, err = kb.resticBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType)
if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err)

View File

@@ -2596,7 +2596,7 @@ func TestBackupWithHooks(t *testing.T) {
type fakeResticBackupperFactory struct{}
func (f *fakeResticBackupperFactory) NewBackupper(context.Context, *velerov1.Backup) (podvolume.Backupper, error) {
func (f *fakeResticBackupperFactory) NewBackupper(context.Context, *velerov1.Backup, string) (podvolume.Backupper, error) {
return &fakeResticBackupper{}, nil
}

View File

@@ -131,7 +131,7 @@ type serverConfig struct {
clientPageSize int
profilerAddress string
formatFlag *logging.FormatFlag
defaultResticMaintenanceFrequency time.Duration
repoMaintenanceFrequency time.Duration
garbageCollectionFrequency time.Duration
defaultVolumesToRestic bool
uploaderType string
@@ -147,25 +147,24 @@ func NewCommand(f client.Factory) *cobra.Command {
volumeSnapshotLocations = flag.NewMap().WithKeyValueDelimiter(':')
logLevelFlag = logging.LogLevelFlag(logrus.InfoLevel)
config = serverConfig{
pluginDir: "/plugins",
metricsAddress: defaultMetricsAddress,
defaultBackupLocation: "default",
defaultVolumeSnapshotLocations: make(map[string]string),
backupSyncPeriod: defaultBackupSyncPeriod,
defaultBackupTTL: defaultBackupTTL,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
storeValidationFrequency: defaultStoreValidationFrequency,
podVolumeOperationTimeout: defaultPodVolumeOperationTimeout,
restoreResourcePriorities: defaultRestorePriorities,
clientQPS: defaultClientQPS,
clientBurst: defaultClientBurst,
clientPageSize: defaultClientPageSize,
profilerAddress: defaultProfilerAddress,
resourceTerminatingTimeout: defaultResourceTerminatingTimeout,
formatFlag: logging.NewFormatFlag(),
defaultResticMaintenanceFrequency: restic.DefaultMaintenanceFrequency,
defaultVolumesToRestic: restic.DefaultVolumesToRestic,
uploaderType: uploader.ResticType,
pluginDir: "/plugins",
metricsAddress: defaultMetricsAddress,
defaultBackupLocation: "default",
defaultVolumeSnapshotLocations: make(map[string]string),
backupSyncPeriod: defaultBackupSyncPeriod,
defaultBackupTTL: defaultBackupTTL,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
storeValidationFrequency: defaultStoreValidationFrequency,
podVolumeOperationTimeout: defaultPodVolumeOperationTimeout,
restoreResourcePriorities: defaultRestorePriorities,
clientQPS: defaultClientQPS,
clientBurst: defaultClientBurst,
clientPageSize: defaultClientPageSize,
profilerAddress: defaultProfilerAddress,
resourceTerminatingTimeout: defaultResourceTerminatingTimeout,
formatFlag: logging.NewFormatFlag(),
defaultVolumesToRestic: restic.DefaultVolumesToRestic,
uploaderType: uploader.ResticType,
}
)
@@ -228,7 +227,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().StringVar(&config.profilerAddress, "profiler-address", config.profilerAddress, "The address to expose the pprof profiler.")
command.Flags().DurationVar(&config.resourceTerminatingTimeout, "terminating-resource-timeout", config.resourceTerminatingTimeout, "How long to wait on persistent volumes and namespaces to terminate during a restore before timing out.")
command.Flags().DurationVar(&config.defaultBackupTTL, "default-backup-ttl", config.defaultBackupTTL, "How long to wait by default before backups can be garbage collected.")
command.Flags().DurationVar(&config.defaultResticMaintenanceFrequency, "default-restic-prune-frequency", config.defaultResticMaintenanceFrequency, "How often 'restic prune' is run for restic repositories by default.")
command.Flags().DurationVar(&config.repoMaintenanceFrequency, "default-restic-prune-frequency", config.repoMaintenanceFrequency, "How often 'prune' is run for backup repositories by default.")
command.Flags().DurationVar(&config.garbageCollectionFrequency, "garbage-collection-frequency", config.garbageCollectionFrequency, "How often garbage collection is run for expired backups.")
command.Flags().BoolVar(&config.defaultVolumesToRestic, "default-volumes-to-restic", config.defaultVolumesToRestic, "Backup all volumes with restic by default.")
command.Flags().StringVar(&config.uploaderType, "uploader-type", config.uploaderType, "Type of uploader to handle the transfer of data of pod volumes")
@@ -260,6 +259,7 @@ type server struct {
config serverConfig
mgr manager.Manager
credentialFileStore credentials.FileStore
credentialSecretStore credentials.SecretStore
}
func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*server, error) {
@@ -349,6 +349,8 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
return nil, err
}
credentialSecretStore, err := credentials.NewNamespacedSecretStore(mgr.GetClient(), f.Namespace())
s := &server{
namespace: f.Namespace(),
metricsAddress: config.metricsAddress,
@@ -368,6 +370,7 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
config: config,
mgr: mgr,
credentialFileStore: credentialFileStore,
credentialSecretStore: credentialSecretStore,
}
return s, nil
@@ -546,7 +549,7 @@ func (s *server) initRestic() error {
s.repoLocker = repository.NewRepoLocker()
s.repoEnsurer = repository.NewRepositoryEnsurer(s.sharedInformerFactory.Velero().V1().BackupRepositories(), s.veleroClient.VeleroV1(), s.logger)
s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.logger)
s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger)
return nil
}
@@ -620,6 +623,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToRestic,
s.config.clientPageSize,
s.config.uploaderType,
)
cmd.CheckError(err)
@@ -781,7 +785,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
if _, ok := enabledRuntimeControllers[controller.ResticRepo]; ok {
if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ResticRepo)
}
}

View File

@@ -45,6 +45,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/kube"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/pkg/podvolume"
)
const (
@@ -506,17 +508,5 @@ func getSnapshotsInBackup(ctx context.Context, backup *velerov1api.Backup, kbCli
return nil, errors.WithStack(err)
}
var res []repository.SnapshotIdentifier
for _, item := range podVolumeBackups.Items {
if item.Status.SnapshotID == "" {
continue
}
res = append(res, repository.SnapshotIdentifier{
VolumeNamespace: item.Spec.Pod.Namespace,
BackupStorageLocation: backup.Spec.StorageLocation,
SnapshotID: item.Status.SnapshotID,
})
}
return res, nil
return podvolume.GetSnapshotIdentifier(podVolumeBackups), nil
}

View File

@@ -771,10 +771,12 @@ func TestGetSnapshotsInBackup(t *testing.T) {
{
VolumeNamespace: "ns-1",
SnapshotID: "snap-3",
RepositoryType: "restic",
},
{
VolumeNamespace: "ns-1",
SnapshotID: "snap-4",
RepositoryType: "restic",
},
},
},
@@ -822,6 +824,7 @@ func TestGetSnapshotsInBackup(t *testing.T) {
{
VolumeNamespace: "ns-1",
SnapshotID: "snap-3",
RepositoryType: "restic",
},
},
},

View File

@@ -32,39 +32,34 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
const (
repoSyncPeriod = 5 * time.Minute
repoSyncPeriod = 5 * time.Minute
defaultMaintainFrequency = 7 * 24 * time.Hour
)
type ResticRepoReconciler struct {
client.Client
namespace string
logger logrus.FieldLogger
clock clock.Clock
defaultMaintenanceFrequency time.Duration
repositoryManager repository.Manager
namespace string
logger logrus.FieldLogger
clock clock.Clock
maintenanceFrequency time.Duration
repositoryManager repository.Manager
}
func NewResticRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client,
defaultMaintenanceFrequency time.Duration, repositoryManager repository.Manager) *ResticRepoReconciler {
maintenanceFrequency time.Duration, repositoryManager repository.Manager) *ResticRepoReconciler {
c := &ResticRepoReconciler{
client,
namespace,
logger,
clock.RealClock{},
defaultMaintenanceFrequency,
maintenanceFrequency,
repositoryManager,
}
if c.defaultMaintenanceFrequency <= 0 {
logger.Infof("Invalid default restic maintenance frequency, setting to %v", restic.DefaultMaintenanceFrequency)
c.defaultMaintenanceFrequency = restic.DefaultMaintenanceFrequency
}
return c
}
@@ -135,7 +130,7 @@ func (r *ResticRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
rr.Status.Phase = velerov1api.BackupRepositoryPhaseNotReady
if rr.Spec.MaintenanceFrequency.Duration <= 0 {
rr.Spec.MaintenanceFrequency = metav1.Duration{Duration: r.defaultMaintenanceFrequency}
rr.Spec.MaintenanceFrequency = metav1.Duration{Duration: r.getRepositoryMaintenanceFrequency(req)}
}
})
}
@@ -145,7 +140,7 @@ func (r *ResticRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
rr.Spec.ResticIdentifier = repoIdentifier
if rr.Spec.MaintenanceFrequency.Duration <= 0 {
rr.Spec.MaintenanceFrequency = metav1.Duration{Duration: r.defaultMaintenanceFrequency}
rr.Spec.MaintenanceFrequency = metav1.Duration{Duration: r.getRepositoryMaintenanceFrequency(req)}
}
}); err != nil {
return err
@@ -161,6 +156,23 @@ func (r *ResticRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
})
}
func (r *ResticRepoReconciler) getRepositoryMaintenanceFrequency(req *velerov1api.BackupRepository) time.Duration {
if r.maintenanceFrequency > 0 {
r.logger.WithField("frequency", r.maintenanceFrequency).Info("Set user defined maintenance frequency")
return r.maintenanceFrequency
} else {
frequency, err := r.repositoryManager.DefaultMaintenanceFrequency(req)
if err != nil || frequency <= 0 {
r.logger.WithError(err).WithField("returned frequency", frequency).Warn("Failed to get maitanance frequency, use the default one")
frequency = defaultMaintainFrequency
} else {
r.logger.WithField("frequency", frequency).Info("Set matainenance according to repository suggestion")
}
return frequency
}
}
// ensureRepo checks to see if a repository exists, and attempts to initialize it if
// it does not exist. An error is returned if the repository can't be connected to
// or initialized.

View File

@@ -15,20 +15,23 @@ package controller
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
const defaultMaintenanceFrequency = 10 * time.Minute
const testMaintenanceFrequency = 10 * time.Minute
func mockResticRepoReconciler(t *testing.T, rr *velerov1api.BackupRepository, mockOn string, arg interface{}, ret interface{}) *ResticRepoReconciler {
mgr := &repomokes.RepositoryManager{}
@@ -39,7 +42,7 @@ func mockResticRepoReconciler(t *testing.T, rr *velerov1api.BackupRepository, mo
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
defaultMaintenanceFrequency,
testMaintenanceFrequency,
mgr,
)
}
@@ -51,7 +54,7 @@ func mockResticRepositoryCR() *velerov1api.BackupRepository {
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{defaultMaintenanceFrequency},
MaintenanceFrequency: metav1.Duration{testMaintenanceFrequency},
},
}
@@ -138,7 +141,7 @@ func TestResticRepoReconcile(t *testing.T) {
Name: "unknown",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{defaultMaintenanceFrequency},
MaintenanceFrequency: metav1.Duration{testMaintenanceFrequency},
},
},
expectNil: true,
@@ -151,7 +154,7 @@ func TestResticRepoReconcile(t *testing.T) {
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{defaultMaintenanceFrequency},
MaintenanceFrequency: metav1.Duration{testMaintenanceFrequency},
},
},
expectNil: true,
@@ -164,7 +167,7 @@ func TestResticRepoReconcile(t *testing.T) {
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{defaultMaintenanceFrequency},
MaintenanceFrequency: metav1.Duration{testMaintenanceFrequency},
},
Status: velerov1api.BackupRepositoryStatus{
Phase: velerov1api.BackupRepositoryPhaseNew,
@@ -187,3 +190,53 @@ func TestResticRepoReconcile(t *testing.T) {
})
}
}
func TestGetRepositoryMaintenanceFrequency(t *testing.T) {
tests := []struct {
name string
mgr repository.Manager
repo *velerov1api.BackupRepository
freqReturn time.Duration
freqError error
userDefinedFreq time.Duration
expectFreq time.Duration
}{
{
name: "user defined valid",
userDefinedFreq: time.Duration(time.Hour),
expectFreq: time.Duration(time.Hour),
},
{
name: "repo return valid",
freqReturn: time.Duration(time.Hour * 2),
expectFreq: time.Duration(time.Hour * 2),
},
{
name: "fall to default",
userDefinedFreq: -1,
freqError: errors.New("fake-error"),
expectFreq: defaultMaintainFrequency,
},
{
name: "fall to default, no freq error",
freqReturn: -1,
expectFreq: defaultMaintainFrequency,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mgr := repomokes.RepositoryManager{}
mgr.On("DefaultMaintenanceFrequency", mock.Anything).Return(test.freqReturn, test.freqError)
reconciler := NewResticRepoReconciler(
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
test.userDefinedFreq,
&mgr,
)
freq := reconciler.getRepositoryMaintenanceFrequency(test.repo)
assert.Equal(t, test.expectFreq, freq)
})
}
}

View File

@@ -49,6 +49,7 @@ type backupper struct {
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
uploaderType string
results map[string]chan *velerov1api.PodVolumeBackup
resultsLock sync.Mutex
@@ -62,6 +63,7 @@ func newBackupper(
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
uploaderType string,
log logrus.FieldLogger,
) *backupper {
b := &backupper{
@@ -71,6 +73,7 @@ func newBackupper(
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
uploaderType: uploaderType,
results: make(map[string]chan *velerov1api.PodVolumeBackup),
}
@@ -107,7 +110,13 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
return nil, nil
}
repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace, backup.Spec.StorageLocation)
repositoryType := getRepositoryType(b.uploaderType)
if repositoryType == "" {
err := errors.Errorf("empty repository type, uploader %s", b.uploaderType)
return nil, []error{err}
}
repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace, backup.Spec.StorageLocation, repositoryType)
if err != nil {
return nil, []error{err}
}
@@ -182,8 +191,7 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
}
// TODO: Remove the hard-coded uploader type before v1.10 FC
volumeBackup := newPodVolumeBackup(backup, pod, volume, repo.Spec.ResticIdentifier, "restic", pvc)
volumeBackup := newPodVolumeBackup(backup, pod, volume, repo.Spec.ResticIdentifier, b.uploaderType, pvc)
if volumeBackup, err = b.veleroClient.VeleroV1().PodVolumeBackups(volumeBackup.Namespace).Create(context.TODO(), volumeBackup, metav1.CreateOptions{}); err != nil {
errs = append(errs, err)
continue

View File

@@ -35,7 +35,7 @@ import (
// BackupperFactory can construct pod volumes backuppers.
type BackupperFactory interface {
// NewBackupper returns a pod volumes backupper for use during a single Velero backup.
NewBackupper(context.Context, *velerov1api.Backup) (Backupper, error)
NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error)
}
func NewBackupperFactory(repoLocker *repository.RepoLocker,
@@ -66,7 +66,7 @@ type backupperFactory struct {
log logrus.FieldLogger
}
func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup) (Backupper, error) {
func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
informer := velerov1informers.NewFilteredPodVolumeBackupInformer(
bf.veleroClient,
backup.Namespace,
@@ -77,7 +77,7 @@ func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1ap
},
)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.log)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, uploaderType, bf.log)
go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, bf.repoInformerSynced) {

View File

@@ -56,6 +56,7 @@ type restorer struct {
resultsLock sync.Mutex
results map[string]chan *velerov1api.PodVolumeRestore
log logrus.FieldLogger
}
func newRestorer(
@@ -75,6 +76,7 @@ func newRestorer(
pvcClient: pvcClient,
results: make(map[string]chan *velerov1api.PodVolumeRestore),
log: log,
}
podVolumeRestoreInformer.AddEventHandler(
@@ -101,12 +103,17 @@ func newRestorer(
}
func (r *restorer) RestorePodVolumes(data RestoreData) []error {
volumesToRestore := GetVolumeBackupsForPod(data.PodVolumeBackups, data.Pod, data.SourceNamespace)
volumesToRestore := getVolumeBackupInfoForPod(data.PodVolumeBackups, data.Pod, data.SourceNamespace)
if len(volumesToRestore) == 0 {
return nil
}
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, data.Restore.Namespace, data.SourceNamespace, data.BackupLocation)
repositoryType, err := getVolumesRepositoryType(volumesToRestore)
if err != nil {
return []error{err}
}
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, data.Restore.Namespace, data.SourceNamespace, data.BackupLocation, repositoryType)
if err != nil {
return []error{err}
}
@@ -132,7 +139,7 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
for _, podVolume := range data.Pod.Spec.Volumes {
podVolumes[podVolume.Name] = podVolume
}
for volume, snapshot := range volumesToRestore {
for volume, backupInfo := range volumesToRestore {
volumeObj, ok := podVolumes[volume]
var pvc *corev1api.PersistentVolumeClaim
if ok {
@@ -144,8 +151,8 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
}
}
}
// TODO: Remove the hard-coded uploader type before v1.10 FC
volumeRestore := newPodVolumeRestore(data.Restore, data.Pod, data.BackupLocation, volume, snapshot, repo.Spec.ResticIdentifier, "restic", pvc)
volumeRestore := newPodVolumeRestore(data.Restore, data.Pod, data.BackupLocation, volume, backupInfo.snapshotID, repo.Spec.ResticIdentifier, backupInfo.uploaderType, pvc)
if err := errorOnly(r.veleroClient.VeleroV1().PodVolumeRestores(volumeRestore.Namespace).Create(context.TODO(), volumeRestore, metav1.CreateOptions{})); err != nil {
errs = append(errs, errors.WithStack(err))
@@ -213,3 +220,29 @@ func newPodVolumeRestore(restore *velerov1api.Restore, pod *corev1api.Pod, backu
}
return pvr
}
func getVolumesRepositoryType(volumes map[string]volumeBackupInfo) (string, error) {
if len(volumes) == 0 {
return "", errors.New("empty volume list")
}
// the podVolumeBackups list come from one backup. In one backup, it is impossible that volumes are
// backed up by different uploaders or to different repositories. Asserting this ensures one repo only,
// which will simplify the following logics
repositoryType := ""
for _, backupInfo := range volumes {
if backupInfo.repositoryType == "" {
return "", errors.Errorf("empty repository type found among volume snapshots, snapshot ID %s, uploader %s",
backupInfo.snapshotID, backupInfo.uploaderType)
}
if repositoryType == "" {
repositoryType = backupInfo.repositoryType
} else if repositoryType != backupInfo.repositoryType {
return "", errors.Errorf("multiple repository type in one backup, current type %s, differential one [type %s, snapshot ID %s, uploader %s]",
repositoryType, backupInfo.repositoryType, backupInfo.snapshotID, backupInfo.uploaderType)
}
}
return repositoryType, nil
}

View File

@@ -0,0 +1,92 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetVolumesRepositoryType(t *testing.T) {
testCases := []struct {
name string
volumes map[string]volumeBackupInfo
expected string
expectedErr string
}{
{
name: "empty volume",
expectedErr: "empty volume list",
},
{
name: "empty repository type, first one",
volumes: map[string]volumeBackupInfo{
"volume1": {"fake-snapshot-id-1", "fake-uploader-1", ""},
"volume2": {"", "", "fake-type"},
},
expectedErr: "empty repository type found among volume snapshots, snapshot ID fake-snapshot-id-1, uploader fake-uploader-1",
},
{
name: "empty repository type, last one",
volumes: map[string]volumeBackupInfo{
"volume1": {"", "", "fake-type"},
"volume2": {"", "", "fake-type"},
"volume3": {"fake-snapshot-id-3", "fake-uploader-3", ""},
},
expectedErr: "empty repository type found among volume snapshots, snapshot ID fake-snapshot-id-3, uploader fake-uploader-3",
},
{
name: "empty repository type, middle one",
volumes: map[string]volumeBackupInfo{
"volume1": {"", "", "fake-type"},
"volume2": {"fake-snapshot-id-2", "fake-uploader-2", ""},
"volume3": {"", "", "fake-type"},
},
expectedErr: "empty repository type found among volume snapshots, snapshot ID fake-snapshot-id-2, uploader fake-uploader-2",
},
{
name: "mismatch repository type",
volumes: map[string]volumeBackupInfo{
"volume1": {"", "", "fake-type1"},
"volume2": {"fake-snapshot-id-2", "fake-uploader-2", "fake-type2"},
},
expectedErr: "multiple repository type in one backup, current type fake-type1, differential one [type fake-type2, snapshot ID fake-snapshot-id-2, uploader fake-uploader-2]",
},
{
name: "success",
volumes: map[string]volumeBackupInfo{
"volume1": {"", "", "fake-type"},
"volume2": {"", "", "fake-type"},
"volume3": {"", "", "fake-type"},
},
expected: "fake-type",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := getVolumesRepositoryType(tc.volumes)
assert.Equal(t, tc.expected, actual)
if err != nil {
assert.EqualError(t, err, tc.expectedErr)
}
})
}
}

View File

@@ -23,6 +23,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
const (
@@ -48,10 +50,33 @@ const (
InitContainer = "restic-wait"
)
// volumeBackupInfo describes the backup info of a volume backed up by PodVolumeBackups
type volumeBackupInfo struct {
snapshotID string
uploaderType string
repositoryType string
}
// GetVolumeBackupsForPod returns a map, of volume name -> snapshot id,
// of the PodVolumeBackups that exist for the provided pod.
func GetVolumeBackupsForPod(podVolumeBackups []*velerov1api.PodVolumeBackup, pod *corev1api.Pod, sourcePodNs string) map[string]string {
volumeBkInfo := getVolumeBackupInfoForPod(podVolumeBackups, pod, sourcePodNs)
if volumeBkInfo == nil {
return nil
}
volumes := make(map[string]string)
for k, v := range volumeBkInfo {
volumes[k] = v.snapshotID
}
return volumes
}
// getVolumeBackupInfoForPod returns a map, of volume name -> VolumeBackupInfo,
// of the PodVolumeBackups that exist for the provided pod.
func getVolumeBackupInfoForPod(podVolumeBackups []*velerov1api.PodVolumeBackup, pod *corev1api.Pod, sourcePodNs string) map[string]volumeBackupInfo {
volumes := make(map[string]volumeBackupInfo)
for _, pvb := range podVolumeBackups {
if !isPVBMatchPod(pvb, pod.GetName(), sourcePodNs) {
@@ -71,14 +96,74 @@ func GetVolumeBackupsForPod(podVolumeBackups []*velerov1api.PodVolumeBackup, pod
continue
}
volumes[pvb.Spec.Volume] = pvb.Status.SnapshotID
volumes[pvb.Spec.Volume] = volumeBackupInfo{
snapshotID: pvb.Status.SnapshotID,
uploaderType: getUploaderTypeOrDefault(pvb.Spec.UploaderType),
repositoryType: getRepositoryType(pvb.Spec.UploaderType),
}
}
if len(volumes) > 0 {
return volumes
}
return getPodSnapshotAnnotations(pod)
fromAnnntation := getPodSnapshotAnnotations(pod)
if fromAnnntation == nil {
return nil
}
for k, v := range fromAnnntation {
volumes[k] = volumeBackupInfo{v, uploader.ResticType, velerov1api.BackupRepositoryTypeRestic}
}
return volumes
}
// GetSnapshotIdentifier returns the snapshots represented by SnapshotIdentifier for the given PVBs
func GetSnapshotIdentifier(podVolumeBackups *velerov1api.PodVolumeBackupList) []repository.SnapshotIdentifier {
var res []repository.SnapshotIdentifier
for _, item := range podVolumeBackups.Items {
if item.Status.SnapshotID == "" {
continue
}
res = append(res, repository.SnapshotIdentifier{
VolumeNamespace: item.Spec.Pod.Namespace,
BackupStorageLocation: item.Spec.BackupStorageLocation,
SnapshotID: item.Status.SnapshotID,
RepositoryType: getRepositoryType(item.Spec.UploaderType),
})
}
return res
}
func getUploaderTypeOrDefault(uploaderType string) string {
if uploaderType != "" {
return uploaderType
} else {
return uploader.ResticType
}
}
// getRepositoryType returns the hardcode repositoryType for different backup methods - Restic or Kopia,uploaderType
// indicates the method.
// For Restic backup method, it is always hardcode to BackupRepositoryTypeRestic, never changed.
// For Kopia backup method, this means we hardcode repositoryType as BackupRepositoryTypeKopia for Unified Repo,
// at present (Kopia backup method is using Unified Repo). However, it doesn't mean we could deduce repositoryType
// from uploaderType for Unified Repo.
// TODO: post v1.10, refactor this function for Kopia backup method. In future, when we have multiple implementations of
// Unified Repo (besides Kopia), we will add the repositoryType to BSL, because by then, we are not able to hardcode
// the repositoryType to BackupRepositoryTypeKopia for Unified Repo.
func getRepositoryType(uploaderType string) string {
switch uploaderType {
case "", uploader.ResticType:
return velerov1api.BackupRepositoryTypeRestic
case uploader.KopiaType:
return velerov1api.BackupRepositoryTypeKopia
default:
return ""
}
}
func isPVBMatchPod(pvb *velerov1api.PodVolumeBackup, podName string, namespace string) bool {

View File

@@ -53,6 +53,7 @@ type RepositoryEnsurer struct {
type repoKey struct {
volumeNamespace string
backupLocation string
repositoryType string
}
func NewRepositoryEnsurer(repoInformer velerov1informers.BackupRepositoryInformer, repoClient velerov1client.BackupRepositoriesGetter, log logrus.FieldLogger) *RepositoryEnsurer {
@@ -83,7 +84,7 @@ func NewRepositoryEnsurer(repoInformer velerov1informers.BackupRepositoryInforme
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation, newObj.Spec.RepositoryType).String()
repoChan, ok := r.repoChans[key]
if !ok {
log.Debugf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
@@ -98,20 +99,25 @@ func NewRepositoryEnsurer(repoInformer velerov1informers.BackupRepositoryInforme
return r
}
func repoLabels(volumeNamespace, backupLocation string) labels.Set {
func repoLabels(volumeNamespace, backupLocation, repositoryType string) labels.Set {
return map[string]string{
velerov1api.ResticVolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(backupLocation),
velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(backupLocation),
velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
}
}
func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*velerov1api.BackupRepository, error) {
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation)
func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation, repositoryType string) (*velerov1api.BackupRepository, error) {
if volumeNamespace == "" || backupLocation == "" || repositoryType == "" {
return nil, errors.Errorf("wrong parameters, namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType)
}
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation).WithField("repositoryType", repositoryType)
// It's only safe to have one instance of this method executing concurrently for a
// given volumeNamespace + backupLocation, so synchronize based on that. It's fine
// given volumeNamespace + backupLocation + repositoryType, so synchronize based on that. It's fine
// to run concurrently for *different* namespaces/locations. If you had 2 goroutines
// running this for the same inputs, both might find no ResticRepository exists, then
// running this for the same inputs, both might find no BackupRepository exists, then
// both would create new ones for the same namespace/location.
//
// This issue could probably be avoided if we had a deterministic name for
@@ -121,7 +127,7 @@ func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
// GenerateName) which poses a backwards compatibility problem.
log.Debug("Acquiring lock")
repoMu := r.repoLock(volumeNamespace, backupLocation)
repoMu := r.repoLock(volumeNamespace, backupLocation, repositoryType)
repoMu.Lock()
defer func() {
repoMu.Unlock()
@@ -130,14 +136,14 @@ func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
log.Debug("Acquired lock")
selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation))
selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation, repositoryType))
repos, err := r.repoLister.BackupRepositories(namespace).List(selector)
if err != nil {
return nil, errors.WithStack(err)
}
if len(repos) > 1 {
return nil, errors.Errorf("more than one ResticRepository found for workload namespace %q, backup storage location %q", volumeNamespace, backupLocation)
return nil, errors.Errorf("more than one BackupRepository found for workload namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType)
}
if len(repos) == 1 {
if repos[0].Status.Phase != velerov1api.BackupRepositoryPhaseReady {
@@ -154,12 +160,13 @@ func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
repo := &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
GenerateName: fmt.Sprintf("%s-%s-", volumeNamespace, backupLocation),
Labels: repoLabels(volumeNamespace, backupLocation),
GenerateName: fmt.Sprintf("%s-%s-%s-", volumeNamespace, backupLocation, repositoryType),
Labels: repoLabels(volumeNamespace, backupLocation, repositoryType),
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: volumeNamespace,
BackupStorageLocation: backupLocation,
RepositoryType: repositoryType,
},
}
@@ -198,13 +205,14 @@ func (r *RepositoryEnsurer) getRepoChan(name string) chan *velerov1api.BackupRep
return r.repoChans[name]
}
func (r *RepositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
func (r *RepositoryEnsurer) repoLock(volumeNamespace, backupLocation, repositoryType string) *sync.Mutex {
r.repoLocksMu.Lock()
defer r.repoLocksMu.Unlock()
key := repoKey{
volumeNamespace: volumeNamespace,
backupLocation: backupLocation,
repositoryType: repositoryType,
}
if r.repoLocks[key] == nil {

View File

@@ -19,6 +19,7 @@ package repository
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -43,6 +44,10 @@ type SnapshotIdentifier struct {
// SnapshotID is the short ID of the restic snapshot.
SnapshotID string
// RepositoryType is the type of the repository where the
// snapshot is stored
RepositoryType string
}
// Manager manages backup repositories.
@@ -65,6 +70,8 @@ type Manager interface {
// Forget removes a snapshot from the list of
// available snapshots in a repo.
Forget(context.Context, SnapshotIdentifier) error
// DefaultMaintenanceFrequency returns the default maintenance frequency from the specific repo
DefaultMaintenanceFrequency(repo *velerov1api.BackupRepository) (time.Duration, error)
}
type manager struct {
@@ -84,6 +91,7 @@ func NewManager(
repoLocker *RepoLocker,
repoEnsurer *RepositoryEnsurer,
credentialFileStore credentials.FileStore,
credentialSecretStore credentials.SecretStore,
log logrus.FieldLogger,
) Manager {
mgr := &manager{
@@ -97,6 +105,10 @@ func NewManager(
}
mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log)
mgr.providers[velerov1api.BackupRepositoryTypeKopia] = provider.NewUnifiedRepoProvider(credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, mgr.log)
return mgr
}
@@ -162,7 +174,7 @@ func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error {
}
func (m *manager) Forget(ctx context.Context, snapshot SnapshotIdentifier) error {
repo, err := m.repoEnsurer.EnsureRepo(ctx, m.namespace, snapshot.VolumeNamespace, snapshot.BackupStorageLocation)
repo, err := m.repoEnsurer.EnsureRepo(ctx, m.namespace, snapshot.VolumeNamespace, snapshot.BackupStorageLocation, snapshot.RepositoryType)
if err != nil {
return err
}
@@ -181,10 +193,26 @@ func (m *manager) Forget(ctx context.Context, snapshot SnapshotIdentifier) error
return prd.Forget(context.Background(), snapshot.SnapshotID, param)
}
func (m *manager) DefaultMaintenanceFrequency(repo *velerov1api.BackupRepository) (time.Duration, error) {
prd, err := m.getRepositoryProvider(repo)
if err != nil {
return 0, errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return 0, errors.WithStack(err)
}
return prd.DefaultMaintenanceFrequency(context.Background(), param), nil
}
func (m *manager) getRepositoryProvider(repo *velerov1api.BackupRepository) (provider.Provider, error) {
switch repo.Spec.RepositoryType {
case "", velerov1api.BackupRepositoryTypeRestic:
return m.providers[velerov1api.BackupRepositoryTypeRestic], nil
case velerov1api.BackupRepositoryTypeKopia:
return m.providers[velerov1api.BackupRepositoryTypeKopia], nil
default:
return nil, fmt.Errorf("failed to get provider for repository %s", repo.Spec.RepositoryType)
}

View File

@@ -26,7 +26,7 @@ import (
)
func TestGetRepositoryProvider(t *testing.T) {
mgr := NewManager("", nil, nil, nil, nil, nil).(*manager)
mgr := NewManager("", nil, nil, nil, nil, nil, nil).(*manager)
repo := &velerov1.BackupRepository{}
// empty repository type

View File

@@ -21,6 +21,8 @@ import (
mock "github.com/stretchr/testify/mock"
time "time"
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
@@ -59,6 +61,27 @@ func (_m *RepositoryManager) Forget(_a0 context.Context, _a1 repository.Snapshot
return r0
}
// DefaultMaintenanceFrequency provides a mock function with given fields: repo
func (_m *RepositoryManager) DefaultMaintenanceFrequency(repo *v1.BackupRepository) (time.Duration, error) {
ret := _m.Called(repo)
var r0 time.Duration
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) time.Duration); ok {
r0 = rf(repo)
} else {
r0 = ret.Get(0).(time.Duration)
}
var r1 error
if rf, ok := ret.Get(1).(func(*v1.BackupRepository) error); ok {
r1 = rf(repo)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// InitRepo provides a mock function with given fields: repo
func (_m *RepositoryManager) InitRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)