mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-04-28 11:27:00 +00:00
Remove Restic code path from PodVolumeRestore.
Signed-off-by: Xun Jiang <xun.jiang@broadcom.com>
This commit is contained in:
1
changelogs/unreleased/9732-blackpiglet
Normal file
1
changelogs/unreleased/9732-blackpiglet
Normal file
@@ -0,0 +1 @@
|
||||
Remove Restic code path from PodVolumeRestore.
|
||||
@@ -35,7 +35,7 @@ func main() {
|
||||
for {
|
||||
<-ticker.C
|
||||
if done() {
|
||||
fmt.Println("All restic restores are done")
|
||||
fmt.Println("All PodVolumeRestores are done")
|
||||
err := removeFolder()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
|
||||
@@ -38,7 +38,7 @@ const (
|
||||
ConfigmapRefType string = "configmap"
|
||||
// skip action implies the volume would be skipped from the backup operation
|
||||
Skip VolumeActionType = "skip"
|
||||
// fs-backup action implies that the volume would be backed up via file system copy method using the uploader(kopia/restic) configured by the user
|
||||
// fs-backup action implies that the volume would be backed up via file system copy method using the uploader(kopia) configured by the user
|
||||
FSBackup VolumeActionType = "fs-backup"
|
||||
// snapshot action can have 3 different meaning based on velero configuration and backup spec - cloud provider based snapshots, local csi snapshots and datamover snapshots
|
||||
Snapshot VolumeActionType = "snapshot"
|
||||
|
||||
@@ -37,7 +37,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
cacheutil "k8s.io/client-go/tools/cache"
|
||||
@@ -430,10 +429,6 @@ func (s *nodeAgentServer) run() {
|
||||
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
|
||||
}
|
||||
|
||||
if err := controller.InitLegacyPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.namespace, s.config.resourceTimeout, s.logger); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the legacy pod volume restore controller")
|
||||
}
|
||||
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(
|
||||
s.mgr.GetClient(),
|
||||
s.mgr,
|
||||
@@ -509,8 +504,6 @@ func (s *nodeAgentServer) run() {
|
||||
if err := pvrReconciler.AttemptPVRResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt PVR resume")
|
||||
}
|
||||
|
||||
s.markLegacyPVRsFailed(s.mgr.GetClient())
|
||||
}()
|
||||
|
||||
s.logger.Info("Controllers starting...")
|
||||
@@ -604,47 +597,6 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nodeAgentServer) markLegacyPVRsFailed(client ctrlclient.Client) {
|
||||
pvrs := &velerov1api.PodVolumeRestoreList{}
|
||||
if err := client.List(s.ctx, pvrs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Error("failed to list podvolumerestores")
|
||||
return
|
||||
}
|
||||
|
||||
for i, pvr := range pvrs.Items {
|
||||
if !controller.IsLegacyPVR(&pvr) {
|
||||
continue
|
||||
}
|
||||
|
||||
if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress {
|
||||
s.logger.Debugf("the status of podvolumerestore %q is %q, skip", pvr.GetName(), pvr.Status.Phase)
|
||||
continue
|
||||
}
|
||||
|
||||
pod := &corev1api.Pod{}
|
||||
if err := client.Get(s.ctx, types.NamespacedName{
|
||||
Namespace: pvr.Spec.Pod.Namespace,
|
||||
Name: pvr.Spec.Pod.Name,
|
||||
}, pod); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to get pod \"%s/%s\" of podvolumerestore %q",
|
||||
pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name, pvr.GetName())
|
||||
continue
|
||||
}
|
||||
if pod.Spec.NodeName != s.nodeName {
|
||||
s.logger.Debugf("the node of pod referenced by podvolumerestore %q is %q, not %q, skip", pvr.GetName(), pod.Spec.NodeName, s.nodeName)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], errors.New("cannot survive from node-agent restart"),
|
||||
fmt.Sprintf("get a legacy podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed),
|
||||
time.Now(), s.logger); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName())
|
||||
continue
|
||||
}
|
||||
s.logger.WithField("podvolumerestore", pvr.GetName()).Warn(pvr.Status.Message)
|
||||
}
|
||||
}
|
||||
|
||||
var getConfigsFunc = nodeagent.GetConfigs
|
||||
|
||||
func (s *nodeAgentServer) getDataPathConfigs() error {
|
||||
|
||||
@@ -1164,8 +1164,8 @@ func markPodVolumeRestoresCancel(ctx context.Context, client ctrlclient.Client,
|
||||
|
||||
for i := range pvrs.Items {
|
||||
pvr := pvrs.Items[i]
|
||||
if controller.IsLegacyPVR(&pvr) {
|
||||
log.WithField("PVR", pvr.GetName()).Warn("Found a legacy PVR during velero server restart, cannot stop it")
|
||||
if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil {
|
||||
log.WithField("PVR", pvr.Name).Warnf("invalid uploader type %s, skip marking cancel for this PVR", pvr.Spec.UploaderType)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -603,7 +603,7 @@ func (r *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvrName
|
||||
func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
|
||||
pvr := object.(*velerov1api.PodVolumeRestore)
|
||||
if IsLegacyPVR(pvr) {
|
||||
if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -628,7 +628,8 @@ func (r *PodVolumeRestoreReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
|
||||
pred := kube.NewAllEventPredicate(func(obj client.Object) bool {
|
||||
pvr := obj.(*velerov1api.PodVolumeRestore)
|
||||
return !IsLegacyPVR(pvr)
|
||||
_, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType)
|
||||
return err == nil
|
||||
})
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
@@ -678,7 +679,7 @@ func (r *PodVolumeRestoreReconciler) findPVRForTargetPod(ctx context.Context, po
|
||||
|
||||
requests := []reconcile.Request{}
|
||||
for _, item := range list.Items {
|
||||
if IsLegacyPVR(&item) {
|
||||
if _, err := uploader.ValidateUploaderType(item.Spec.UploaderType); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -708,6 +709,11 @@ func (r *PodVolumeRestoreReconciler) findPVRForRestorePod(ctx context.Context, p
|
||||
"PVR": pvr.Name,
|
||||
})
|
||||
|
||||
if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil {
|
||||
log.WithField("uploaderType", pvr.Spec.UploaderType).Debug("skip PVR with invalid uploader type")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
if pvr.Status.Phase != velerov1api.PodVolumeRestorePhaseAccepted {
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
@@ -1029,7 +1035,7 @@ func (r *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logge
|
||||
|
||||
for i := range pvrs.Items {
|
||||
pvr := &pvrs.Items[i]
|
||||
if IsLegacyPVR(pvr) {
|
||||
if _, err := uploader.ValidateUploaderType(pvr.Spec.UploaderType); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -1,364 +0,0 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clocks "k8s.io/utils/clock"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
veleroapishared "github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/datapath"
|
||||
"github.com/vmware-tanzu/velero/pkg/exposer"
|
||||
"github.com/vmware-tanzu/velero/pkg/podvolume"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/restorehelper"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
func InitLegacyPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, namespace string,
|
||||
resourceTimeout time.Duration, logger logrus.FieldLogger) error {
|
||||
log := logger.WithField("controller", "PodVolumeRestoreLegacy")
|
||||
|
||||
credentialFileStore, err := credentials.NewNamespacedFileStore(client, namespace, credentials.DefaultStoreDirectory(), filesystem.NewFileSystem())
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error creating credentials file store")
|
||||
}
|
||||
|
||||
credSecretStore, err := credentials.NewNamespacedSecretStore(client, namespace)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error creating secret file store")
|
||||
}
|
||||
|
||||
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
|
||||
ensurer := repository.NewEnsurer(client, log, resourceTimeout)
|
||||
|
||||
reconciler := &PodVolumeRestoreReconcilerLegacy{
|
||||
Client: client,
|
||||
kubeClient: kubeClient,
|
||||
logger: log,
|
||||
repositoryEnsurer: ensurer,
|
||||
credentialGetter: credentialGetter,
|
||||
fileSystem: filesystem.NewFileSystem(),
|
||||
clock: &clocks.RealClock{},
|
||||
dataPathMgr: dataPathMgr,
|
||||
}
|
||||
|
||||
if err = reconciler.SetupWithManager(mgr); err != nil {
|
||||
return errors.Wrapf(err, "error setup controller manager")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type PodVolumeRestoreReconcilerLegacy struct {
|
||||
client.Client
|
||||
kubeClient kubernetes.Interface
|
||||
logger logrus.FieldLogger
|
||||
repositoryEnsurer *repository.Ensurer
|
||||
credentialGetter *credentials.CredentialGetter
|
||||
fileSystem filesystem.Interface
|
||||
clock clocks.WithTickerAndDelayedExecution
|
||||
dataPathMgr *datapath.Manager
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
|
||||
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get
|
||||
// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
log := c.logger.WithField("PodVolumeRestore", req.NamespacedName.String())
|
||||
log.Info("Reconciling PVR by legacy controller")
|
||||
|
||||
pvr := &velerov1api.PodVolumeRestore{}
|
||||
if err := c.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, pvr); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.Warn("PodVolumeRestore not found, skip")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
log.WithError(err).Error("Unable to get the PodVolumeRestore")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
log = log.WithField("pod", fmt.Sprintf("%s/%s", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name))
|
||||
if len(pvr.OwnerReferences) == 1 {
|
||||
log = log.WithField("restore", fmt.Sprintf("%s/%s", pvr.Namespace, pvr.OwnerReferences[0].Name))
|
||||
}
|
||||
|
||||
shouldProcess, pod, err := shouldProcess(ctx, c.Client, log, pvr)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
if !shouldProcess {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
initContainerIndex := getInitContainerIndex(pod)
|
||||
if initContainerIndex > 0 {
|
||||
log.Warnf(`Init containers before the %s container may cause issues
|
||||
if they interfere with volumes being restored: %s index %d`, restorehelper.WaitInitContainer, restorehelper.WaitInitContainer, initContainerIndex)
|
||||
}
|
||||
|
||||
log.Info("Restore starting")
|
||||
|
||||
callbacks := datapath.Callbacks{
|
||||
OnCompleted: c.OnDataPathCompleted,
|
||||
OnFailed: c.OnDataPathFailed,
|
||||
OnCancelled: c.OnDataPathCancelled,
|
||||
OnProgress: c.OnDataPathProgress,
|
||||
}
|
||||
|
||||
fsRestore, err := c.dataPathMgr.CreateFileSystemBR(pvr.Name, pVBRRequestor, ctx, c.Client, pvr.Namespace, callbacks, log)
|
||||
if err != nil {
|
||||
if err == datapath.ConcurrentLimitExceed {
|
||||
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
|
||||
} else {
|
||||
return c.errorOut(ctx, pvr, err, "error to create data path", log)
|
||||
}
|
||||
}
|
||||
|
||||
original := pvr.DeepCopy()
|
||||
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress
|
||||
pvr.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()}
|
||||
if err = c.Patch(ctx, pvr, client.MergeFrom(original)); err != nil {
|
||||
c.closeDataPath(ctx, pvr.Name)
|
||||
return c.errorOut(ctx, pvr, err, "error to update status to in progress", log)
|
||||
}
|
||||
|
||||
volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log)
|
||||
if err != nil {
|
||||
c.closeDataPath(ctx, pvr.Name)
|
||||
return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log)
|
||||
}
|
||||
|
||||
log.WithField("path", volumePath.ByPath).Debugf("Found host path")
|
||||
|
||||
if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
|
||||
BSLName: pvr.Spec.BackupStorageLocation,
|
||||
SourceNamespace: pvr.Spec.SourceNamespace,
|
||||
UploaderType: pvr.Spec.UploaderType,
|
||||
RepositoryType: podvolume.GetPvrRepositoryType(pvr),
|
||||
RepoIdentifier: pvr.Spec.RepoIdentifier,
|
||||
RepositoryEnsurer: c.repositoryEnsurer,
|
||||
CredentialGetter: c.credentialGetter,
|
||||
}); err != nil {
|
||||
c.closeDataPath(ctx, pvr.Name)
|
||||
return c.errorOut(ctx, pvr, err, "error to initialize data path", log)
|
||||
}
|
||||
|
||||
if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, volumePath, pvr.Spec.UploaderSettings); err != nil {
|
||||
c.closeDataPath(ctx, pvr.Name)
|
||||
return c.errorOut(ctx, pvr, err, "error starting data path restore", log)
|
||||
}
|
||||
|
||||
log.WithField("path", volumePath.ByPath).Info("Async fs restore data path started")
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
|
||||
_ = UpdatePVRStatusToFailed(ctx, c.Client, pvr, err, msg, c.clock.Now(), log)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) SetupWithManager(mgr ctrl.Manager) error {
|
||||
// The pod may not being scheduled at the point when its PVRs are initially reconciled.
|
||||
// By watching the pods, we can trigger the PVR reconciliation again once the pod is finally scheduled on the node.
|
||||
pred := kube.NewAllEventPredicate(func(obj client.Object) bool {
|
||||
pvr := obj.(*velerov1api.PodVolumeRestore)
|
||||
return IsLegacyPVR(pvr)
|
||||
})
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).Named("podvolumerestorelegacy").
|
||||
For(&velerov1api.PodVolumeRestore{}, builder.WithPredicates(pred)).
|
||||
Watches(&corev1api.Pod{}, handler.EnqueueRequestsFromMapFunc(c.findVolumeRestoresForPod)).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) findVolumeRestoresForPod(ctx context.Context, pod client.Object) []reconcile.Request {
|
||||
list := &velerov1api.PodVolumeRestoreList{}
|
||||
options := &client.ListOptions{
|
||||
LabelSelector: labels.Set(map[string]string{
|
||||
velerov1api.PodUIDLabel: string(pod.GetUID()),
|
||||
}).AsSelector(),
|
||||
}
|
||||
if err := c.Client.List(context.TODO(), list, options); err != nil {
|
||||
c.logger.WithField("pod", fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())).WithError(err).
|
||||
Error("unable to list PodVolumeRestores")
|
||||
return []reconcile.Request{}
|
||||
}
|
||||
|
||||
requests := []reconcile.Request{}
|
||||
for _, item := range list.Items {
|
||||
if !IsLegacyPVR(&item) {
|
||||
continue
|
||||
}
|
||||
|
||||
requests = append(requests, reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: item.GetNamespace(),
|
||||
Name: item.GetName(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return requests
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
|
||||
defer c.dataPathMgr.RemoveAsyncBR(pvrName)
|
||||
|
||||
log := c.logger.WithField("pvr", pvrName)
|
||||
|
||||
log.WithField("PVR", pvrName).Info("Async fs restore data path completed")
|
||||
|
||||
var pvr velerov1api.PodVolumeRestore
|
||||
if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil {
|
||||
log.WithError(err).Warn("Failed to get PVR on completion")
|
||||
return
|
||||
}
|
||||
|
||||
volumePath := result.Restore.Target.ByPath
|
||||
if volumePath == "" {
|
||||
_, _ = c.errorOut(ctx, &pvr, errors.New("path is empty"), "invalid restore target", log)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
|
||||
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
|
||||
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
|
||||
// for the one specific to the restore being executed).
|
||||
if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
|
||||
log.WithError(err).Warnf("error removing .velero directory from directory %s", volumePath)
|
||||
}
|
||||
|
||||
var restoreUID types.UID
|
||||
for _, owner := range pvr.OwnerReferences {
|
||||
if boolptr.IsSetToTrue(owner.Controller) {
|
||||
restoreUID = owner.UID
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Create the .velero directory within the volume dir so we can write a done file
|
||||
// for this restore.
|
||||
if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
|
||||
_, _ = c.errorOut(ctx, &pvr, err, "error creating .velero directory for done file", log)
|
||||
return
|
||||
}
|
||||
|
||||
// Write a done file with name=<restore-uid> into the just-created .velero dir
|
||||
// within the volume. The velero init container on the pod is waiting
|
||||
// for this file to exist in each restored volume before completing.
|
||||
if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check.
|
||||
_, _ = c.errorOut(ctx, &pvr, err, "error writing done file", log)
|
||||
return
|
||||
}
|
||||
|
||||
original := pvr.DeepCopy()
|
||||
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted
|
||||
pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()}
|
||||
if err := c.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("error updating PodVolumeRestore status")
|
||||
}
|
||||
|
||||
log.Info("Restore completed")
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) {
|
||||
defer c.dataPathMgr.RemoveAsyncBR(pvrName)
|
||||
|
||||
log := c.logger.WithField("pvr", pvrName)
|
||||
|
||||
log.WithError(err).Error("Async fs restore data path failed")
|
||||
|
||||
var pvr velerov1api.PodVolumeRestore
|
||||
if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil {
|
||||
log.WithError(getErr).Warn("Failed to get PVR on failure")
|
||||
} else {
|
||||
_, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) {
|
||||
defer c.dataPathMgr.RemoveAsyncBR(pvrName)
|
||||
|
||||
log := c.logger.WithField("pvr", pvrName)
|
||||
|
||||
log.Warn("Async fs restore data path canceled")
|
||||
|
||||
var pvr velerov1api.PodVolumeRestore
|
||||
if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil {
|
||||
log.WithError(getErr).Warn("Failed to get PVR on cancel")
|
||||
} else {
|
||||
_, _ = c.errorOut(ctx, &pvr, errors.New("PVR is canceled"), "data path restore canceled", log)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) {
|
||||
log := c.logger.WithField("pvr", pvrName)
|
||||
|
||||
var pvr velerov1api.PodVolumeRestore
|
||||
if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil {
|
||||
log.WithError(err).Warn("Failed to get PVB on progress")
|
||||
return
|
||||
}
|
||||
|
||||
original := pvr.DeepCopy()
|
||||
pvr.Status.Progress = veleroapishared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone}
|
||||
|
||||
if err := c.Client.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil {
|
||||
log.WithError(err).Error("Failed to update progress")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconcilerLegacy) closeDataPath(ctx context.Context, pvbName string) {
|
||||
fsRestore := c.dataPathMgr.GetAsyncBR(pvbName)
|
||||
if fsRestore != nil {
|
||||
fsRestore.Close(ctx)
|
||||
}
|
||||
|
||||
c.dataPathMgr.RemoveAsyncBR(pvbName)
|
||||
}
|
||||
|
||||
func IsLegacyPVR(pvr *velerov1api.PodVolumeRestore) bool {
|
||||
return pvr.Spec.UploaderType == "restic"
|
||||
}
|
||||
@@ -1,93 +0,0 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
func TestFindVolumeRestoresForPodLegacy(t *testing.T) {
|
||||
pod := &corev1api.Pod{}
|
||||
pod.UID = "uid"
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
scheme.AddKnownTypes(velerov1api.SchemeGroupVersion, &velerov1api.PodVolumeRestore{}, &velerov1api.PodVolumeRestoreList{})
|
||||
clientBuilder := fake.NewClientBuilder().WithScheme(scheme)
|
||||
|
||||
// no matching PVR
|
||||
reconciler := &PodVolumeRestoreReconcilerLegacy{
|
||||
Client: clientBuilder.Build(),
|
||||
logger: logrus.New(),
|
||||
}
|
||||
requests := reconciler.findVolumeRestoresForPod(t.Context(), pod)
|
||||
assert.Empty(t, requests)
|
||||
|
||||
// contain one matching PVR
|
||||
reconciler.Client = clientBuilder.WithLists(&velerov1api.PodVolumeRestoreList{
|
||||
Items: []velerov1api.PodVolumeRestore{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvr1",
|
||||
Labels: map[string]string{
|
||||
velerov1api.PodUIDLabel: string(pod.GetUID()),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvr2",
|
||||
Labels: map[string]string{
|
||||
velerov1api.PodUIDLabel: "non-matching-uid",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvr3",
|
||||
Labels: map[string]string{
|
||||
velerov1api.PodUIDLabel: string(pod.GetUID()),
|
||||
},
|
||||
},
|
||||
Spec: velerov1api.PodVolumeRestoreSpec{
|
||||
UploaderType: "kopia",
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pvr4",
|
||||
Labels: map[string]string{
|
||||
velerov1api.PodUIDLabel: string(pod.GetUID()),
|
||||
},
|
||||
},
|
||||
Spec: velerov1api.PodVolumeRestoreSpec{
|
||||
UploaderType: "restic",
|
||||
},
|
||||
},
|
||||
},
|
||||
}).Build()
|
||||
requests = reconciler.findVolumeRestoresForPod(t.Context(), pod)
|
||||
assert.Len(t, requests, 1)
|
||||
}
|
||||
@@ -526,6 +526,7 @@ func TestFindPVRForTargetPod(t *testing.T) {
|
||||
velerov1api.PodUIDLabel: string(pod.GetUID()),
|
||||
},
|
||||
},
|
||||
Spec: velerov1api.PodVolumeRestoreSpec{UploaderType: uploader.KopiaType},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -688,6 +689,7 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
|
||||
mockClose bool
|
||||
needExclusiveUpdateError error
|
||||
constrained bool
|
||||
preserveEmptyUploader bool
|
||||
expected *velerov1api.PodVolumeRestore
|
||||
expectDeleted bool
|
||||
expectCancelRecord bool
|
||||
@@ -939,6 +941,13 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if !test.preserveEmptyUploader && test.pvr != nil && test.pvr.Spec.UploaderType == "" {
|
||||
test.pvr.Spec.UploaderType = uploader.KopiaType
|
||||
}
|
||||
if !test.preserveEmptyUploader && test.expected != nil && test.expected.Spec.UploaderType == "" {
|
||||
test.expected.Spec.UploaderType = uploader.KopiaType
|
||||
}
|
||||
|
||||
objs := []runtime.Object{daemonSet, node}
|
||||
|
||||
ctlObj := []client.Object{}
|
||||
@@ -1396,7 +1405,7 @@ func TestFindPVBForRestorePod(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "find pvr for pod",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(),
|
||||
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
|
||||
// Assert that the function returns a single request
|
||||
@@ -1407,7 +1416,7 @@ func TestFindPVBForRestorePod(t *testing.T) {
|
||||
},
|
||||
}, {
|
||||
name: "no selected label found for pod",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Result(),
|
||||
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
|
||||
// Assert that the function returns a single request
|
||||
@@ -1415,7 +1424,7 @@ func TestFindPVBForRestorePod(t *testing.T) {
|
||||
},
|
||||
}, {
|
||||
name: "no matched pod",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: "non-existing-pvr"}).Result(),
|
||||
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
|
||||
assert.Empty(t, requests)
|
||||
@@ -1423,12 +1432,20 @@ func TestFindPVBForRestorePod(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "pvr not accept",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Result(),
|
||||
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
|
||||
assert.Empty(t, requests)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid uploader type",
|
||||
pvr: pvrBuilder().UploaderType("restic").Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pod: builder.ForPod(velerov1api.DefaultNamespace, pvrName).Labels(map[string]string{velerov1api.PVRLabel: pvrName}).Status(corev1api.PodStatus{Phase: corev1api.PodRunning}).Result(),
|
||||
checkFunc: func(pvr *velerov1api.PodVolumeRestore, requests []reconcile.Request) {
|
||||
assert.Empty(t, requests)
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
ctx := t.Context()
|
||||
@@ -1613,32 +1630,32 @@ func TestAttemptPVRResume(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "Other pvr",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
|
||||
},
|
||||
{
|
||||
name: "Other pvr",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseAccepted).Result(),
|
||||
},
|
||||
{
|
||||
name: "InProgress pvr, not the current node",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Result(),
|
||||
inProgressPvrs: []string{pvrName},
|
||||
},
|
||||
{
|
||||
name: "InProgress pvr, no resume error",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(),
|
||||
inProgressPvrs: []string{pvrName},
|
||||
},
|
||||
{
|
||||
name: "InProgress pvr, resume error, cancel error",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(),
|
||||
resumeErr: errors.New("fake-resume-error"),
|
||||
needErrs: []bool{false, false, true, false, false, false},
|
||||
inProgressPvrs: []string{pvrName},
|
||||
},
|
||||
{
|
||||
name: "InProgress pvr, resume error, cancel succeed",
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhaseInProgress).Node("node-1").Result(),
|
||||
resumeErr: errors.New("fake-resume-error"),
|
||||
cancelledPvrs: []string{pvrName},
|
||||
inProgressPvrs: []string{pvrName},
|
||||
@@ -1646,7 +1663,7 @@ func TestAttemptPVRResume(t *testing.T) {
|
||||
{
|
||||
name: "Error",
|
||||
needErrs: []bool{false, false, false, false, false, true},
|
||||
pvr: pvrBuilder().Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
|
||||
pvr: pvrBuilder().UploaderType(uploader.KopiaType).Phase(velerov1api.PodVolumeRestorePhasePrepared).Result(),
|
||||
expectedError: "error to list PVRs: List error",
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user