mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-08 06:15:40 +00:00
Merge pull request #7041 from blackpiglet/6190_part_2
Remove dependency of generated client part 2
This commit is contained in:
1
changelogs/unreleased/7041-blackpiglet
Normal file
1
changelogs/unreleased/7041-blackpiglet
Normal file
@@ -0,0 +1 @@
|
||||
Remove the Velero generated client.
|
||||
@@ -26,8 +26,8 @@ import (
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util"
|
||||
)
|
||||
|
||||
func TestIsReadyToValidate(t *testing.T) {
|
||||
@@ -163,7 +163,7 @@ func TestListBackupStorageLocations(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
client := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(tt.backupLocations).Build()
|
||||
client := fake.NewClientBuilder().WithScheme(util.VeleroScheme).WithRuntimeObjects(tt.backupLocations).Build()
|
||||
if tt.expectError {
|
||||
_, err := ListBackupStorageLocations(context.Background(), client, "ns-1")
|
||||
g.Expect(err).NotTo(BeNil())
|
||||
|
||||
@@ -25,20 +25,14 @@ import (
|
||||
)
|
||||
|
||||
func CreateRetryGenerateName(client kbclient.Client, ctx context.Context, obj kbclient.Object) error {
|
||||
return CreateRetryGenerateNameWithFunc(obj, func() error {
|
||||
return client.Create(ctx, obj, &kbclient.CreateOptions{})
|
||||
})
|
||||
}
|
||||
|
||||
func CreateRetryGenerateNameWithFunc(obj kbclient.Object, createFn func() error) error {
|
||||
retryCreateFn := func() error {
|
||||
// needed to ensure that the name from the failed create isn't left on the object between retries
|
||||
obj.SetName("")
|
||||
return createFn()
|
||||
return client.Create(ctx, obj, &kbclient.CreateOptions{})
|
||||
}
|
||||
if obj.GetGenerateName() != "" && obj.GetName() == "" {
|
||||
return retry.OnError(retry.DefaultRetry, apierrors.IsAlreadyExists, retryCreateFn)
|
||||
} else {
|
||||
return createFn()
|
||||
return client.Create(ctx, obj, &kbclient.CreateOptions{})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,12 +124,12 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := f.Client()
|
||||
client, err := f.KubebuilderClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := client.VeleroV1().VolumeSnapshotLocations(volumeSnapshotLocation.Namespace).Create(context.TODO(), volumeSnapshotLocation, metav1.CreateOptions{}); err != nil {
|
||||
if err := client.Create(context.TODO(), volumeSnapshotLocation); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
@@ -36,18 +37,19 @@ func NewGetCommand(f client.Factory, use string) *cobra.Command {
|
||||
Run: func(c *cobra.Command, args []string) {
|
||||
err := output.ValidateFlags(c)
|
||||
cmd.CheckError(err)
|
||||
veleroClient, err := f.Client()
|
||||
client, err := f.KubebuilderClient()
|
||||
cmd.CheckError(err)
|
||||
var locations *api.VolumeSnapshotLocationList
|
||||
locations := new(api.VolumeSnapshotLocationList)
|
||||
|
||||
if len(args) > 0 {
|
||||
locations = new(api.VolumeSnapshotLocationList)
|
||||
for _, name := range args {
|
||||
location, err := veleroClient.VeleroV1().VolumeSnapshotLocations(f.Namespace()).Get(context.TODO(), name, metav1.GetOptions{})
|
||||
location := new(api.VolumeSnapshotLocation)
|
||||
err := client.Get(context.TODO(), kbclient.ObjectKey{Namespace: f.Namespace(), Name: name}, location)
|
||||
cmd.CheckError(err)
|
||||
locations.Items = append(locations.Items, *location)
|
||||
}
|
||||
} else {
|
||||
locations, err = veleroClient.VeleroV1().VolumeSnapshotLocations(f.Namespace()).List(context.TODO(), listOptions)
|
||||
err = client.List(context.TODO(), locations, &kbclient.ListOptions{Namespace: f.Namespace()})
|
||||
cmd.CheckError(err)
|
||||
}
|
||||
_, err = output.PrintWithFormat(c, locations)
|
||||
|
||||
@@ -66,7 +66,6 @@ import (
|
||||
"github.com/vmware-tanzu/velero/pkg/controller"
|
||||
velerodiscovery "github.com/vmware-tanzu/velero/pkg/discovery"
|
||||
"github.com/vmware-tanzu/velero/pkg/features"
|
||||
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperationmap"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
@@ -250,12 +249,12 @@ type server struct {
|
||||
metricsAddress string
|
||||
kubeClientConfig *rest.Config
|
||||
kubeClient kubernetes.Interface
|
||||
veleroClient clientset.Interface
|
||||
discoveryClient discovery.DiscoveryInterface
|
||||
discoveryHelper velerodiscovery.Helper
|
||||
dynamicClient dynamic.Interface
|
||||
csiSnapshotClient *snapshotv1client.Clientset
|
||||
csiSnapshotLister snapshotv1listers.VolumeSnapshotLister
|
||||
crClient ctrlclient.Client
|
||||
ctx context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
logger logrus.FieldLogger
|
||||
@@ -305,6 +304,11 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
|
||||
return nil, err
|
||||
}
|
||||
|
||||
crClient, err := f.KubebuilderClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pluginRegistry := process.NewRegistry(config.pluginDir, logger, logger.Level)
|
||||
if err := pluginRegistry.DiscoverPlugins(); err != nil {
|
||||
return nil, err
|
||||
@@ -380,9 +384,9 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
|
||||
metricsAddress: config.metricsAddress,
|
||||
kubeClientConfig: clientConfig,
|
||||
kubeClient: kubeClient,
|
||||
veleroClient: veleroClient,
|
||||
discoveryClient: veleroClient.Discovery(),
|
||||
dynamicClient: dynamicClient,
|
||||
crClient: crClient,
|
||||
ctx: ctx,
|
||||
cancelFunc: cancelFunc,
|
||||
logger: logger,
|
||||
@@ -727,6 +731,11 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupStorageLocation)
|
||||
}
|
||||
|
||||
pvbInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov1api.PodVolumeBackup{})
|
||||
if err != nil {
|
||||
s.logger.Fatal(err, "fail to get controller-runtime informer from manager for PVB")
|
||||
}
|
||||
|
||||
if _, ok := enabledRuntimeControllers[controller.Backup]; ok {
|
||||
backupper, err := backup.NewKubernetesBackupper(
|
||||
s.mgr.GetClient(),
|
||||
@@ -736,10 +745,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
podvolume.NewBackupperFactory(
|
||||
s.repoLocker,
|
||||
s.repoEnsurer,
|
||||
s.veleroClient,
|
||||
s.kubeClient.CoreV1(),
|
||||
s.kubeClient.CoreV1(),
|
||||
s.kubeClient.CoreV1(),
|
||||
s.crClient,
|
||||
pvbInformer,
|
||||
s.logger,
|
||||
),
|
||||
s.config.podVolumeOperationTimeout,
|
||||
@@ -818,10 +825,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
podvolume.NewBackupperFactory(
|
||||
s.repoLocker,
|
||||
s.repoEnsurer,
|
||||
s.veleroClient,
|
||||
s.kubeClient.CoreV1(),
|
||||
s.kubeClient.CoreV1(),
|
||||
s.kubeClient.CoreV1(),
|
||||
s.crClient,
|
||||
pvbInformer,
|
||||
s.logger,
|
||||
),
|
||||
s.config.podVolumeOperationTimeout,
|
||||
@@ -909,6 +914,11 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
}
|
||||
}
|
||||
|
||||
pvrInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov1api.PodVolumeRestore{})
|
||||
if err != nil {
|
||||
s.logger.Fatal(err, "fail to get controller-runtime informer from manager for PVR")
|
||||
}
|
||||
|
||||
if _, ok := enabledRuntimeControllers[controller.Restore]; ok {
|
||||
restorer, err := restore.NewKubernetesRestorer(
|
||||
s.discoveryHelper,
|
||||
@@ -918,10 +928,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
podvolume.NewRestorerFactory(
|
||||
s.repoLocker,
|
||||
s.repoEnsurer,
|
||||
s.veleroClient,
|
||||
s.kubeClient.CoreV1(),
|
||||
s.kubeClient.CoreV1(),
|
||||
s.kubeClient,
|
||||
s.crClient,
|
||||
pvrInformer,
|
||||
s.logger,
|
||||
),
|
||||
s.config.podVolumeOperationTimeout,
|
||||
|
||||
@@ -22,13 +22,13 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -52,12 +52,18 @@ func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace s
|
||||
}
|
||||
|
||||
// IsRunningInNode checks if the node agent pod is running properly in a specified node. If not, return the error found
|
||||
func IsRunningInNode(ctx context.Context, namespace string, nodeName string, podClient corev1client.PodsGetter) error {
|
||||
func IsRunningInNode(ctx context.Context, namespace string, nodeName string, crClient ctrlclient.Client) error {
|
||||
if nodeName == "" {
|
||||
return errors.New("node name is empty")
|
||||
}
|
||||
|
||||
pods, err := podClient.Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s", daemonSet)})
|
||||
pods := new(v1.PodList)
|
||||
parsedSelector, err := labels.Parse(fmt.Sprintf("name=%s", daemonSet))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fail to parse selector")
|
||||
}
|
||||
|
||||
err = crClient.List(ctx, pods, &ctrlclient.ListOptions{LabelSelector: parsedSelector})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to list daemonset pods")
|
||||
}
|
||||
|
||||
@@ -27,13 +27,14 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/itemoperation"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
|
||||
"github.com/vmware-tanzu/velero/pkg/util"
|
||||
"github.com/vmware-tanzu/velero/pkg/volume"
|
||||
)
|
||||
|
||||
@@ -302,7 +303,9 @@ func (s *objectBackupStore) GetBackupMetadata(name string) (*velerov1api.Backup,
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
decoder := scheme.Codecs.UniversalDecoder(velerov1api.SchemeGroupVersion)
|
||||
codecFactory := serializer.NewCodecFactory(util.VeleroScheme)
|
||||
|
||||
decoder := codecFactory.UniversalDecoder(velerov1api.SchemeGroupVersion)
|
||||
obj, _, err := decoder.Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
||||
@@ -26,13 +26,13 @@ import (
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
veleroclient "github.com/vmware-tanzu/velero/pkg/client"
|
||||
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
@@ -50,10 +50,7 @@ type backupper struct {
|
||||
ctx context.Context
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
pvClient corev1client.PersistentVolumesGetter
|
||||
podClient corev1client.PodsGetter
|
||||
crClient ctrlclient.Client
|
||||
uploaderType string
|
||||
|
||||
results map[string]chan *velerov1api.PodVolumeBackup
|
||||
@@ -103,32 +100,31 @@ func newBackupper(
|
||||
ctx context.Context,
|
||||
repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
podVolumeBackupInformer cache.SharedIndexInformer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
pvClient corev1client.PersistentVolumesGetter,
|
||||
podClient corev1client.PodsGetter,
|
||||
pvbInformer ctrlcache.Informer,
|
||||
crClient ctrlclient.Client,
|
||||
uploaderType string,
|
||||
backup *velerov1api.Backup,
|
||||
log logrus.FieldLogger,
|
||||
) *backupper {
|
||||
b := &backupper{
|
||||
ctx: ctx,
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
veleroClient: veleroClient,
|
||||
pvcClient: pvcClient,
|
||||
pvClient: pvClient,
|
||||
podClient: podClient,
|
||||
crClient: crClient,
|
||||
uploaderType: uploaderType,
|
||||
|
||||
results: make(map[string]chan *velerov1api.PodVolumeBackup),
|
||||
}
|
||||
|
||||
podVolumeBackupInformer.AddEventHandler(
|
||||
pvbInformer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(_, obj interface{}) {
|
||||
pvb := obj.(*velerov1api.PodVolumeBackup)
|
||||
|
||||
if pvb.GetLabels()[velerov1api.BackupUIDLabel] != string(backup.UID) {
|
||||
return
|
||||
}
|
||||
|
||||
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
|
||||
b.resultsLock.Lock()
|
||||
defer b.resultsLock.Unlock()
|
||||
@@ -153,7 +149,8 @@ func resultsKey(ns, name string) string {
|
||||
|
||||
func (b *backupper) getMatchAction(resPolicies *resourcepolicies.Policies, pvc *corev1api.PersistentVolumeClaim, volume *corev1api.Volume) (*resourcepolicies.Action, error) {
|
||||
if pvc != nil {
|
||||
pv, err := b.pvClient.PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
pv := new(corev1api.PersistentVolume)
|
||||
err := b.crClient.Get(context.TODO(), ctrlclient.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error getting pv for pvc %s", pvc.Spec.VolumeName)
|
||||
}
|
||||
@@ -173,7 +170,7 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
|
||||
}
|
||||
log.Infof("pod %s/%s has volumes to backup: %v", pod.Namespace, pod.Name, volumesToBackup)
|
||||
|
||||
err := nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.podClient)
|
||||
err := nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.crClient)
|
||||
if err != nil {
|
||||
return nil, nil, []error{err}
|
||||
}
|
||||
@@ -213,7 +210,8 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
|
||||
for _, podVolume := range pod.Spec.Volumes {
|
||||
podVolumes[podVolume.Name] = podVolume
|
||||
if podVolume.PersistentVolumeClaim != nil {
|
||||
pvc, err := b.pvcClient.PersistentVolumeClaims(pod.Namespace).Get(context.TODO(), podVolume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
|
||||
pvc := new(corev1api.PersistentVolumeClaim)
|
||||
err := b.crClient.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: pod.Namespace, Name: podVolume.PersistentVolumeClaim.ClaimName}, pvc)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error getting persistent volume claim for volume"))
|
||||
continue
|
||||
@@ -263,7 +261,7 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
|
||||
|
||||
// hostPath volumes are not supported because they're not mounted into /var/lib/kubelet/pods, so our
|
||||
// daemonset pod has no way to access their data.
|
||||
isHostPath, err := isHostPathVolume(&volume, pvc, b.pvClient.PersistentVolumes())
|
||||
isHostPath, err := isHostPathVolume(&volume, pvc, b.crClient)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error checking if volume is a hostPath volume"))
|
||||
continue
|
||||
@@ -303,11 +301,7 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
|
||||
}
|
||||
|
||||
volumeBackup := newPodVolumeBackup(backup, pod, volume, repoIdentifier, b.uploaderType, pvc)
|
||||
// TODO: once backupper is refactored to use controller-runtime, just pass client instead of anonymous func
|
||||
if err := veleroclient.CreateRetryGenerateNameWithFunc(volumeBackup, func() error {
|
||||
_, err := b.veleroClient.VeleroV1().PodVolumeBackups(volumeBackup.Namespace).Create(context.TODO(), volumeBackup, metav1.CreateOptions{})
|
||||
return err
|
||||
}); err != nil {
|
||||
if err := veleroclient.CreateRetryGenerateName(b.crClient, b.ctx, volumeBackup); err != nil {
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
@@ -339,13 +333,9 @@ ForEachVolume:
|
||||
return podVolumeBackups, pvcSummary, errs
|
||||
}
|
||||
|
||||
type pvGetter interface {
|
||||
Get(ctx context.Context, name string, opts metav1.GetOptions) (*corev1api.PersistentVolume, error)
|
||||
}
|
||||
|
||||
// isHostPathVolume returns true if the volume is either a hostPath pod volume or a persistent
|
||||
// volume claim on a hostPath persistent volume, or false otherwise.
|
||||
func isHostPathVolume(volume *corev1api.Volume, pvc *corev1api.PersistentVolumeClaim, pvGetter pvGetter) (bool, error) {
|
||||
func isHostPathVolume(volume *corev1api.Volume, pvc *corev1api.PersistentVolumeClaim, crClient ctrlclient.Client) (bool, error) {
|
||||
if volume.HostPath != nil {
|
||||
return true, nil
|
||||
}
|
||||
@@ -354,7 +344,8 @@ func isHostPathVolume(volume *corev1api.Volume, pvc *corev1api.PersistentVolumeC
|
||||
return false, nil
|
||||
}
|
||||
|
||||
pv, err := pvGetter.Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
pv := new(corev1api.PersistentVolume)
|
||||
err := crClient.Get(context.TODO(), ctrlclient.ObjectKey{Name: pvc.Spec.VolumeName}, pv)
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
@@ -421,7 +412,3 @@ func newPodVolumeBackup(backup *velerov1api.Backup, pod *corev1api.Pod, volume c
|
||||
|
||||
return pvb
|
||||
}
|
||||
|
||||
func errorOnly(_ interface{}, err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -18,17 +18,14 @@ package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
)
|
||||
|
||||
@@ -41,48 +38,31 @@ type BackupperFactory interface {
|
||||
func NewBackupperFactory(
|
||||
repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
pvClient corev1client.PersistentVolumesGetter,
|
||||
podClient corev1client.PodsGetter,
|
||||
crClient ctrlclient.Client,
|
||||
pvbInformer ctrlcache.Informer,
|
||||
log logrus.FieldLogger,
|
||||
) BackupperFactory {
|
||||
return &backupperFactory{
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
veleroClient: veleroClient,
|
||||
pvcClient: pvcClient,
|
||||
pvClient: pvClient,
|
||||
podClient: podClient,
|
||||
log: log,
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
crClient: crClient,
|
||||
pvbInformer: pvbInformer,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
type backupperFactory struct {
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
pvClient corev1client.PersistentVolumesGetter
|
||||
podClient corev1client.PodsGetter
|
||||
log logrus.FieldLogger
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
crClient ctrlclient.Client
|
||||
pvbInformer ctrlcache.Informer
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
|
||||
informer := velerov1informers.NewFilteredPodVolumeBackupInformer(
|
||||
bf.veleroClient,
|
||||
backup.Namespace,
|
||||
0,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
func(opts *metav1.ListOptions) {
|
||||
opts.LabelSelector = fmt.Sprintf("%s=%s", velerov1api.BackupUIDLabel, backup.UID)
|
||||
},
|
||||
)
|
||||
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup, bf.log)
|
||||
|
||||
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.podClient, uploaderType, bf.log)
|
||||
|
||||
go informer.Run(ctx.Done())
|
||||
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
|
||||
if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) {
|
||||
return nil, errors.New("timed out waiting for caches to sync")
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -32,17 +31,15 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
"k8s.io/client-go/kubernetes"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clientTesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
func TestIsHostPathVolume(t *testing.T) {
|
||||
@@ -101,15 +98,14 @@ func TestIsHostPathVolume(t *testing.T) {
|
||||
VolumeName: "pv-1",
|
||||
},
|
||||
}
|
||||
pvGetter := &fakePVGetter{
|
||||
pv: &corev1api.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pv-1",
|
||||
},
|
||||
Spec: corev1api.PersistentVolumeSpec{},
|
||||
pv := &corev1api.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pv-1",
|
||||
},
|
||||
Spec: corev1api.PersistentVolumeSpec{},
|
||||
}
|
||||
isHostPath, err = isHostPathVolume(vol, pvc, pvGetter)
|
||||
crClient1 := velerotest.NewFakeControllerRuntimeClient(t, pv)
|
||||
isHostPath, err = isHostPathVolume(vol, pvc, crClient1)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, isHostPath)
|
||||
|
||||
@@ -130,35 +126,23 @@ func TestIsHostPathVolume(t *testing.T) {
|
||||
VolumeName: "pv-1",
|
||||
},
|
||||
}
|
||||
pvGetter = &fakePVGetter{
|
||||
pv: &corev1api.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pv-1",
|
||||
},
|
||||
Spec: corev1api.PersistentVolumeSpec{
|
||||
PersistentVolumeSource: corev1api.PersistentVolumeSource{
|
||||
HostPath: &corev1api.HostPathVolumeSource{},
|
||||
},
|
||||
pv = &corev1api.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pv-1",
|
||||
},
|
||||
Spec: corev1api.PersistentVolumeSpec{
|
||||
PersistentVolumeSource: corev1api.PersistentVolumeSource{
|
||||
HostPath: &corev1api.HostPathVolumeSource{},
|
||||
},
|
||||
},
|
||||
}
|
||||
isHostPath, err = isHostPathVolume(vol, pvc, pvGetter)
|
||||
crClient2 := velerotest.NewFakeControllerRuntimeClient(t, pv)
|
||||
|
||||
isHostPath, err = isHostPathVolume(vol, pvc, crClient2)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, isHostPath)
|
||||
}
|
||||
|
||||
type fakePVGetter struct {
|
||||
pv *corev1api.PersistentVolume
|
||||
}
|
||||
|
||||
func (g *fakePVGetter) Get(ctx context.Context, name string, opts metav1.GetOptions) (*corev1api.PersistentVolume, error) {
|
||||
if g.pv != nil {
|
||||
return g.pv, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("item not found")
|
||||
}
|
||||
|
||||
func Test_backupper_BackupPodVolumes_log_test(t *testing.T) {
|
||||
type args struct {
|
||||
backup *velerov1api.Backup
|
||||
@@ -322,6 +306,7 @@ func createPVBObj(fail bool, withSnapshot bool, index int, uploaderType string)
|
||||
func TestBackupPodVolumes(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
velerov1api.AddToScheme(scheme)
|
||||
corev1api.AddToScheme(scheme)
|
||||
|
||||
ctxWithCancel, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -509,40 +494,6 @@ func TestBackupPodVolumes(t *testing.T) {
|
||||
uploaderType: "kopia",
|
||||
bsl: "fake-bsl",
|
||||
},
|
||||
{
|
||||
name: "create PVB fail",
|
||||
volumes: []string{
|
||||
"fake-volume-1",
|
||||
"fake-volume-2",
|
||||
},
|
||||
sourcePod: createPodObj(true, true, true, 2),
|
||||
kubeClientObj: []runtime.Object{
|
||||
createNodeAgentPodObj(true),
|
||||
createPVCObj(1),
|
||||
createPVCObj(2),
|
||||
createPVObj(1, false),
|
||||
createPVObj(2, false),
|
||||
},
|
||||
ctlClientObj: []runtime.Object{
|
||||
createBackupRepoObj(),
|
||||
},
|
||||
veleroReactors: []reactor{
|
||||
{
|
||||
verb: "create",
|
||||
resource: "podvolumebackups",
|
||||
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, nil, errors.New("fake-create-error")
|
||||
},
|
||||
},
|
||||
},
|
||||
runtimeScheme: scheme,
|
||||
uploaderType: "kopia",
|
||||
bsl: "fake-bsl",
|
||||
errs: []string{
|
||||
"fake-create-error",
|
||||
"fake-create-error",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "context cancelled",
|
||||
ctx: ctxWithCancel,
|
||||
@@ -630,23 +581,28 @@ func TestBackupPodVolumes(t *testing.T) {
|
||||
fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme)
|
||||
}
|
||||
|
||||
fakeCtlClient := fakeClientBuilder.WithRuntimeObjects(test.ctlClientObj...).Build()
|
||||
objList := append(test.ctlClientObj, test.veleroClientObj...)
|
||||
objList = append(objList, test.kubeClientObj...)
|
||||
fakeCtrlClient := fakeClientBuilder.WithRuntimeObjects(objList...).Build()
|
||||
|
||||
fakeKubeClient := kubefake.NewSimpleClientset(test.kubeClientObj...)
|
||||
var kubeClient kubernetes.Interface = fakeKubeClient
|
||||
|
||||
fakeVeleroClient := velerofake.NewSimpleClientset(test.veleroClientObj...)
|
||||
for _, reactor := range test.veleroReactors {
|
||||
fakeVeleroClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
|
||||
fakeCRWatchClient := velerotest.NewFakeControllerRuntimeWatchClient(t, test.kubeClientObj...)
|
||||
lw := kube.InternalLW{
|
||||
Client: fakeCRWatchClient,
|
||||
Namespace: velerov1api.DefaultNamespace,
|
||||
ObjectList: new(velerov1api.PodVolumeBackupList),
|
||||
}
|
||||
var veleroClient versioned.Interface = fakeVeleroClient
|
||||
|
||||
ensurer := repository.NewEnsurer(fakeCtlClient, velerotest.NewLogger(), time.Millisecond)
|
||||
pvbInformer := cache.NewSharedIndexInformer(&lw, &velerov1api.PodVolumeBackup{}, 0, cache.Indexers{})
|
||||
|
||||
go pvbInformer.Run(ctx.Done())
|
||||
require.True(t, cache.WaitForCacheSync(ctx.Done(), pvbInformer.HasSynced))
|
||||
|
||||
ensurer := repository.NewEnsurer(fakeCtrlClient, velerotest.NewLogger(), time.Millisecond)
|
||||
|
||||
backupObj := builder.ForBackup(velerov1api.DefaultNamespace, "fake-backup").Result()
|
||||
backupObj.Spec.StorageLocation = test.bsl
|
||||
|
||||
factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, veleroClient, kubeClient.CoreV1(), kubeClient.CoreV1(), kubeClient.CoreV1(), velerotest.NewLogger())
|
||||
factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, fakeCtrlClient, pvbInformer, velerotest.NewLogger())
|
||||
bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -27,12 +27,12 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
veleroclient "github.com/vmware-tanzu/velero/pkg/client"
|
||||
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
"github.com/vmware-tanzu/velero/pkg/label"
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
@@ -54,13 +54,11 @@ type Restorer interface {
|
||||
}
|
||||
|
||||
type restorer struct {
|
||||
ctx context.Context
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
podClient corev1client.PodsGetter
|
||||
kubeClient kubernetes.Interface
|
||||
ctx context.Context
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
kubeClient kubernetes.Interface
|
||||
crClient ctrlclient.Client
|
||||
|
||||
resultsLock sync.Mutex
|
||||
results map[string]chan *velerov1api.PodVolumeRestore
|
||||
@@ -72,30 +70,30 @@ func newRestorer(
|
||||
ctx context.Context,
|
||||
repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
podVolumeRestoreInformer cache.SharedIndexInformer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
podClient corev1client.PodsGetter,
|
||||
pvrInformer ctrlcache.Informer,
|
||||
kubeClient kubernetes.Interface,
|
||||
crClient ctrlclient.Client,
|
||||
restore *velerov1api.Restore,
|
||||
log logrus.FieldLogger,
|
||||
) *restorer {
|
||||
r := &restorer{
|
||||
ctx: ctx,
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
veleroClient: veleroClient,
|
||||
pvcClient: pvcClient,
|
||||
podClient: podClient,
|
||||
kubeClient: kubeClient,
|
||||
ctx: ctx,
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
kubeClient: kubeClient,
|
||||
crClient: crClient,
|
||||
|
||||
results: make(map[string]chan *velerov1api.PodVolumeRestore),
|
||||
log: log,
|
||||
}
|
||||
|
||||
podVolumeRestoreInformer.AddEventHandler(
|
||||
pvrInformer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(_, obj interface{}) {
|
||||
pvr := obj.(*velerov1api.PodVolumeRestore)
|
||||
if pvr.GetLabels()[velerov1api.RestoreUIDLabel] != string(restore.UID) {
|
||||
return
|
||||
}
|
||||
|
||||
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseFailed {
|
||||
r.resultsLock.Lock()
|
||||
@@ -169,7 +167,8 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
|
||||
var pvc *corev1api.PersistentVolumeClaim
|
||||
if ok {
|
||||
if volumeObj.PersistentVolumeClaim != nil {
|
||||
pvc, err = r.pvcClient.PersistentVolumeClaims(data.Pod.Namespace).Get(context.TODO(), volumeObj.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
|
||||
pvc := new(corev1api.PersistentVolumeClaim)
|
||||
err := r.crClient.Get(context.TODO(), ctrlclient.ObjectKey{Namespace: data.Pod.Namespace, Name: volumeObj.PersistentVolumeClaim.ClaimName}, pvc)
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, "error getting persistent volume claim for volume"))
|
||||
continue
|
||||
@@ -179,10 +178,7 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
|
||||
|
||||
volumeRestore := newPodVolumeRestore(data.Restore, data.Pod, data.BackupLocation, volume, backupInfo.snapshotID, repoIdentifier, backupInfo.uploaderType, data.SourceNamespace, pvc)
|
||||
|
||||
// TODO: once restorer is refactored to use controller-runtime, just pass client instead of anonymous func
|
||||
if err := veleroclient.CreateRetryGenerateNameWithFunc(volumeRestore, func() error {
|
||||
return errorOnly(r.veleroClient.VeleroV1().PodVolumeRestores(volumeRestore.Namespace).Create(context.TODO(), volumeRestore, metav1.CreateOptions{}))
|
||||
}); err != nil {
|
||||
if err := veleroclient.CreateRetryGenerateName(r.crClient, r.ctx, volumeRestore); err != nil {
|
||||
errs = append(errs, errors.WithStack(err))
|
||||
continue
|
||||
}
|
||||
@@ -214,7 +210,7 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
|
||||
} else if err != nil {
|
||||
r.log.WithError(err).Error("Failed to check node-agent pod status, disengage")
|
||||
} else {
|
||||
err = nodeagent.IsRunningInNode(checkCtx, data.Restore.Namespace, nodeName, r.podClient)
|
||||
err = nodeagent.IsRunningInNode(checkCtx, data.Restore.Namespace, nodeName, r.crClient)
|
||||
if err != nil {
|
||||
r.log.WithField("node", nodeName).WithError(err).Error("node-agent pod is not running in node, abort the restore")
|
||||
r.nodeAgentCheck <- errors.Wrapf(err, "node-agent pod is not running in node %s", nodeName)
|
||||
|
||||
@@ -18,18 +18,15 @@ package podvolume
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
)
|
||||
|
||||
@@ -41,47 +38,33 @@ type RestorerFactory interface {
|
||||
|
||||
func NewRestorerFactory(repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
podClient corev1client.PodsGetter,
|
||||
kubeClient kubernetes.Interface,
|
||||
crClient ctrlclient.Client,
|
||||
pvrInformer ctrlcache.Informer,
|
||||
log logrus.FieldLogger) RestorerFactory {
|
||||
return &restorerFactory{
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
veleroClient: veleroClient,
|
||||
pvcClient: pvcClient,
|
||||
podClient: podClient,
|
||||
kubeClient: kubeClient,
|
||||
log: log,
|
||||
repoLocker: repoLocker,
|
||||
repoEnsurer: repoEnsurer,
|
||||
kubeClient: kubeClient,
|
||||
crClient: crClient,
|
||||
pvrInformer: pvrInformer,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
type restorerFactory struct {
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
podClient corev1client.PodsGetter
|
||||
kubeClient kubernetes.Interface
|
||||
log logrus.FieldLogger
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.Ensurer
|
||||
kubeClient kubernetes.Interface
|
||||
crClient ctrlclient.Client
|
||||
pvrInformer ctrlcache.Informer
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func (rf *restorerFactory) NewRestorer(ctx context.Context, restore *velerov1api.Restore) (Restorer, error) {
|
||||
informer := velerov1informers.NewFilteredPodVolumeRestoreInformer(
|
||||
rf.veleroClient,
|
||||
restore.Namespace,
|
||||
0,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
func(opts *metav1.ListOptions) {
|
||||
opts.LabelSelector = fmt.Sprintf("%s=%s", velerov1api.RestoreUIDLabel, restore.UID)
|
||||
},
|
||||
)
|
||||
r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, rf.pvrInformer, rf.kubeClient, rf.crClient, restore, rf.log)
|
||||
|
||||
r := newRestorer(ctx, rf.repoLocker, rf.repoEnsurer, informer, rf.veleroClient, rf.pvcClient, rf.podClient, rf.kubeClient, rf.log)
|
||||
|
||||
go informer.Run(ctx.Done())
|
||||
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
|
||||
if !cache.WaitForCacheSync(ctx.Done(), rf.pvrInformer.HasSynced) {
|
||||
return nil, errors.New("timed out waiting for cache to sync")
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
appv1 "k8s.io/api/apps/v1"
|
||||
@@ -31,15 +30,14 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clientTesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
|
||||
velerofake "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
func TestGetVolumesRepositoryType(t *testing.T) {
|
||||
@@ -162,6 +160,7 @@ type expectError struct {
|
||||
func TestRestorePodVolumes(t *testing.T) {
|
||||
scheme := runtime.NewScheme()
|
||||
velerov1api.AddToScheme(scheme)
|
||||
corev1api.AddToScheme(scheme)
|
||||
|
||||
ctxWithCancel, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -264,42 +263,6 @@ func TestRestorePodVolumes(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create pvb fail",
|
||||
pvbs: []*velerov1api.PodVolumeBackup{
|
||||
createPVBObj(true, true, 1, "kopia"),
|
||||
createPVBObj(true, true, 2, "kopia"),
|
||||
},
|
||||
kubeClientObj: []runtime.Object{
|
||||
createNodeAgentDaemonset(),
|
||||
createPVCObj(1),
|
||||
createPVCObj(2),
|
||||
},
|
||||
ctlClientObj: []runtime.Object{
|
||||
createBackupRepoObj(),
|
||||
},
|
||||
veleroReactors: []reactor{
|
||||
{
|
||||
verb: "create",
|
||||
resource: "podvolumerestores",
|
||||
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, nil, errors.New("fake-create-error")
|
||||
},
|
||||
},
|
||||
},
|
||||
restoredPod: createPodObj(true, true, true, 2),
|
||||
sourceNamespace: "fake-ns",
|
||||
bsl: "fake-bsl",
|
||||
runtimeScheme: scheme,
|
||||
errs: []expectError{
|
||||
{
|
||||
err: "fake-create-error",
|
||||
},
|
||||
{
|
||||
err: "fake-create-error",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create pvb fail",
|
||||
ctx: ctxWithCancel,
|
||||
@@ -407,22 +370,32 @@ func TestRestorePodVolumes(t *testing.T) {
|
||||
fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme)
|
||||
}
|
||||
|
||||
fakeCtlClient := fakeClientBuilder.WithRuntimeObjects(test.ctlClientObj...).Build()
|
||||
objClient := append(test.ctlClientObj, test.kubeClientObj...)
|
||||
objClient = append(objClient, test.veleroClientObj...)
|
||||
|
||||
fakeCRClient := velerotest.NewFakeControllerRuntimeClient(t, objClient...)
|
||||
|
||||
fakeKubeClient := kubefake.NewSimpleClientset(test.kubeClientObj...)
|
||||
var kubeClient kubernetes.Interface = fakeKubeClient
|
||||
|
||||
fakeVeleroClient := velerofake.NewSimpleClientset(test.veleroClientObj...)
|
||||
for _, reactor := range test.veleroReactors {
|
||||
fakeVeleroClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
|
||||
fakeCRWatchClient := velerotest.NewFakeControllerRuntimeWatchClient(t, test.kubeClientObj...)
|
||||
lw := kube.InternalLW{
|
||||
Client: fakeCRWatchClient,
|
||||
Namespace: velerov1api.DefaultNamespace,
|
||||
ObjectList: new(velerov1api.PodVolumeRestoreList),
|
||||
}
|
||||
var veleroClient versioned.Interface = fakeVeleroClient
|
||||
|
||||
ensurer := repository.NewEnsurer(fakeCtlClient, velerotest.NewLogger(), time.Millisecond)
|
||||
pvrInformer := cache.NewSharedIndexInformer(&lw, &velerov1api.PodVolumeBackup{}, 0, cache.Indexers{})
|
||||
|
||||
go pvrInformer.Run(ctx.Done())
|
||||
require.True(t, cache.WaitForCacheSync(ctx.Done(), pvrInformer.HasSynced))
|
||||
|
||||
ensurer := repository.NewEnsurer(fakeCRClient, velerotest.NewLogger(), time.Millisecond)
|
||||
|
||||
restoreObj := builder.ForRestore(velerov1api.DefaultNamespace, "fake-restore").Result()
|
||||
|
||||
factory := NewRestorerFactory(repository.NewRepoLocker(), ensurer, veleroClient, kubeClient.CoreV1(), kubeClient.CoreV1(), kubeClient, velerotest.NewLogger())
|
||||
factory := NewRestorerFactory(repository.NewRepoLocker(), ensurer, kubeClient,
|
||||
fakeCRClient, pvrInformer, velerotest.NewLogger())
|
||||
rs, err := factory.NewRestorer(ctx, restoreObj)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -28,8 +28,6 @@ import (
|
||||
|
||||
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
|
||||
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
|
||||
providermocks "github.com/vmware-tanzu/velero/pkg/plugin/velero/mocks/volumesnapshotter/v1"
|
||||
vsv1 "github.com/vmware-tanzu/velero/pkg/plugin/velero/volumesnapshotter/v1"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
@@ -116,11 +114,6 @@ func TestExecutePVAction_NoSnapshotRestores(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var (
|
||||
client = fake.NewSimpleClientset()
|
||||
snapshotLocationInformer = informers.NewSharedInformerFactory(client, 0).Velero().V1().VolumeSnapshotLocations()
|
||||
)
|
||||
|
||||
r := &pvRestorer{
|
||||
logger: velerotest.NewLogger(),
|
||||
restorePVs: tc.restore.Spec.RestorePVs,
|
||||
@@ -132,7 +125,7 @@ func TestExecutePVAction_NoSnapshotRestores(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, loc := range tc.locations {
|
||||
require.NoError(t, snapshotLocationInformer.Informer().GetStore().Add(loc))
|
||||
require.NoError(t, r.kbclient.Create(context.TODO(), loc))
|
||||
}
|
||||
|
||||
res, err := r.executePVAction(tc.obj)
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
|
||||
"github.com/stretchr/testify/require"
|
||||
appsv1api "k8s.io/api/apps/v1"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@@ -38,6 +39,8 @@ func NewFakeControllerRuntimeClientBuilder(t *testing.T) *k8sfake.ClientBuilder
|
||||
require.NoError(t, err)
|
||||
err = corev1api.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
err = appsv1api.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
err = snapshotv1api.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
return k8sfake.NewClientBuilder().WithScheme(scheme)
|
||||
@@ -51,7 +54,13 @@ func NewFakeControllerRuntimeClient(t *testing.T, initObjs ...runtime.Object) cl
|
||||
require.NoError(t, err)
|
||||
err = corev1api.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
err = appsv1api.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
err = snapshotv1api.AddToScheme(scheme)
|
||||
require.NoError(t, err)
|
||||
return k8sfake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(initObjs...).Build()
|
||||
}
|
||||
|
||||
func NewFakeControllerRuntimeWatchClient(t *testing.T, initObjs ...runtime.Object) client.WithWatch {
|
||||
return NewFakeControllerRuntimeClientBuilder(t).WithRuntimeObjects(initObjs...).Build()
|
||||
}
|
||||
|
||||
@@ -35,12 +35,12 @@ import (
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
"github.com/vmware-tanzu/velero/internal/credentials/mocks"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
udmrepo "github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
udmrepomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
|
||||
"github.com/vmware-tanzu/velero/pkg/util"
|
||||
)
|
||||
|
||||
type FakeBackupProgressUpdater struct {
|
||||
@@ -64,7 +64,7 @@ func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {}
|
||||
func TestRunBackup(t *testing.T) {
|
||||
var kp kopiaProvider
|
||||
kp.log = logrus.New()
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
@@ -121,7 +121,7 @@ func TestRunBackup(t *testing.T) {
|
||||
func TestRunRestore(t *testing.T) {
|
||||
var kp kopiaProvider
|
||||
kp.log = logrus.New()
|
||||
updater := FakeRestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
|
||||
updater := FakeRestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
"github.com/vmware-tanzu/velero/internal/credentials/mocks"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/util"
|
||||
)
|
||||
|
||||
type NewUploaderProviderTestCase struct {
|
||||
@@ -42,7 +42,7 @@ type NewUploaderProviderTestCase struct {
|
||||
func TestNewUploaderProvider(t *testing.T) {
|
||||
// Mock objects or dependencies
|
||||
ctx := context.Background()
|
||||
client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()
|
||||
client := fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()
|
||||
repoIdentifier := "repoIdentifier"
|
||||
bsl := &velerov1api.BackupStorageLocation{}
|
||||
backupRepo := &velerov1api.BackupRepository{}
|
||||
|
||||
@@ -33,9 +33,9 @@ import (
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
)
|
||||
|
||||
@@ -148,7 +148,7 @@ func TestResticRunBackup(t *testing.T) {
|
||||
tc.volMode = uploader.PersistentVolumeFilesystem
|
||||
}
|
||||
if !tc.nilUpdater {
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
|
||||
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, &updater)
|
||||
} else {
|
||||
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, nil)
|
||||
@@ -221,7 +221,7 @@ func TestResticRunRestore(t *testing.T) {
|
||||
}
|
||||
var err error
|
||||
if !tc.nilUpdater {
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
|
||||
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
|
||||
err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, &updater)
|
||||
} else {
|
||||
err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, nil)
|
||||
|
||||
@@ -25,9 +25,10 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
|
||||
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/util"
|
||||
)
|
||||
|
||||
// Encode converts the provided object to the specified format
|
||||
@@ -56,8 +57,11 @@ func To(obj runtime.Object, format string, w io.Writer) error {
|
||||
// Only objects registered in the velero scheme, or objects with their TypeMeta set will have valid encoders.
|
||||
func EncoderFor(format string, obj runtime.Object) (runtime.Encoder, error) {
|
||||
var encoder runtime.Encoder
|
||||
|
||||
codecFactory := serializer.NewCodecFactory(util.VeleroScheme)
|
||||
|
||||
desiredMediaType := fmt.Sprintf("application/%s", format)
|
||||
serializerInfo, found := runtime.SerializerInfoForMediaType(scheme.Codecs.SupportedMediaTypes(), desiredMediaType)
|
||||
serializerInfo, found := runtime.SerializerInfoForMediaType(codecFactory.SupportedMediaTypes(), desiredMediaType)
|
||||
if !found {
|
||||
return nil, errors.Errorf("unable to locate an encoder for %q", desiredMediaType)
|
||||
}
|
||||
@@ -69,7 +73,7 @@ func EncoderFor(format string, obj runtime.Object) (runtime.Encoder, error) {
|
||||
if !obj.GetObjectKind().GroupVersionKind().Empty() {
|
||||
return encoder, nil
|
||||
}
|
||||
encoder = scheme.Codecs.EncoderForVersion(encoder, v1.SchemeGroupVersion)
|
||||
encoder = codecFactory.EncoderForVersion(encoder, v1.SchemeGroupVersion)
|
||||
return encoder, nil
|
||||
}
|
||||
|
||||
|
||||
19
pkg/util/scheme.go
Normal file
19
pkg/util/scheme.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
|
||||
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
)
|
||||
|
||||
var VeleroScheme = runtime.NewScheme()
|
||||
|
||||
func init() {
|
||||
localSchemeBuilder := runtime.SchemeBuilder{
|
||||
v1.AddToScheme,
|
||||
v2alpha1.AddToScheme,
|
||||
}
|
||||
utilruntime.Must(localSchemeBuilder.AddToScheme(VeleroScheme))
|
||||
}
|
||||
Reference in New Issue
Block a user