From b54404fc56c75d1aa906b2627a139af8abd26b6e Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Tue, 3 Jun 2025 14:35:27 +0800 Subject: [PATCH] vgdp ms pvr data path Signed-off-by: Lyndon-Li --- pkg/apis/velero/v1/pod_volume_restore_type.go | 4 + pkg/cmd/cli/podvolume/restore.go | 284 +++++++++++++++ pkg/podvolume/pvr_micro_service.go | 334 ++++++++++++++++++ 3 files changed, 622 insertions(+) create mode 100644 pkg/cmd/cli/podvolume/restore.go create mode 100644 pkg/podvolume/pvr_micro_service.go diff --git a/pkg/apis/velero/v1/pod_volume_restore_type.go b/pkg/apis/velero/v1/pod_volume_restore_type.go index 34bc7e530..d8871b708 100644 --- a/pkg/apis/velero/v1/pod_volume_restore_type.go +++ b/pkg/apis/velero/v1/pod_volume_restore_type.go @@ -54,6 +54,10 @@ type PodVolumeRestoreSpec struct { // +optional // +nullable UploaderSettings map[string]string `json:"uploaderSettings,omitempty"` + + // Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set + // when the PodVolumeRestore is in InProgress phase + Cancel bool `json:"cancel,omitempty"` } // PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore. diff --git a/pkg/cmd/cli/podvolume/restore.go b/pkg/cmd/cli/podvolume/restore.go new file mode 100644 index 000000000..2894cf8c6 --- /dev/null +++ b/pkg/cmd/cli/podvolume/restore.go @@ -0,0 +1,284 @@ +/* +Copyright The Velero Contributors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podvolume + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/bombsimon/logrusr/v3" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + corev1api "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/buildinfo" + "github.com/vmware-tanzu/velero/pkg/client" + "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/podvolume" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/logging" + + ctlcache "sigs.k8s.io/controller-runtime/pkg/cache" + ctlclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type pvrConfig struct { + volumePath string + pvrName string + resourceTimeout time.Duration + winHPC bool +} + +func NewRestoreCommand(f client.Factory) *cobra.Command { + logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel) + formatFlag := logging.NewFormatFlag() + + config := pvrConfig{} + + command := &cobra.Command{ + Use: "restore", + Short: "Run the velero pod volume restore", + Long: "Run the velero pod volume restore", + Hidden: true, + Run: func(c *cobra.Command, args []string) { + logLevel := logLevelFlag.Parse() + logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String())) + + logger := logging.DefaultLogger(logLevel, formatFlag.Parse()) + logger.Infof("Starting Velero pod volume restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA()) + + f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name())) + s, err := newPodVolumeRestore(logger, f, config) + if err != nil { + exitWithMessage(logger, false, "Failed to create pod volume restore, %v", err) + } + + s.run() + }, + } + + command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", "))) + command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", "))) + command.Flags().StringVar(&config.volumePath, "volume-path", config.volumePath, "The full path of the volume to be restored") + command.Flags().StringVar(&config.pvrName, "pod-volume-restore", config.pvrName, "The PVR name") + command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters.") + + _ = command.MarkFlagRequired("volume-path") + _ = command.MarkFlagRequired("pod-volume-restore") + _ = command.MarkFlagRequired("resource-timeout") + + return command +} + +type podVolumeRestore struct { + logger logrus.FieldLogger + ctx context.Context + cancelFunc context.CancelFunc + client ctlclient.Client + cache ctlcache.Cache + namespace string + nodeName string + config pvrConfig + kubeClient kubernetes.Interface + dataPathMgr *datapath.Manager +} + +func newPodVolumeRestore(logger logrus.FieldLogger, factory client.Factory, config pvrConfig) (*podVolumeRestore, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + + clientConfig, err := factory.ClientConfig() + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client config") + } + + ctrl.SetLogger(logrusr.New(logger)) + klog.SetLogger(logrusr.New(logger)) // klog.Logger is used by k8s.io/client-go + + scheme := runtime.NewScheme() + if err := velerov1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add velero v1 scheme") + } + + if err := corev1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add core v1 scheme") + } + + nodeName := os.Getenv("NODE_NAME") + + // use a field selector to filter to only pods scheduled on this node. + cacheOption := ctlcache.Options{ + Scheme: scheme, + ByObject: map[ctlclient.Object]ctlcache.ByObject{ + &corev1api.Pod{}: { + Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(), + }, + &velerov1api.PodVolumeRestore{}: { + Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), + }, + }, + } + + cli, err := ctlclient.New(clientConfig, ctlclient.Options{ + Scheme: scheme, + }) + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client") + } + + var cache ctlcache.Cache + retry := 10 + for { + cache, err = ctlcache.New(clientConfig, cacheOption) + if err == nil { + break + } + + retry-- + if retry == 0 { + break + } + + logger.WithError(err).Warn("Failed to create client cache, need retry") + + time.Sleep(time.Second) + } + + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client cache") + } + + s := &podVolumeRestore{ + logger: logger, + ctx: ctx, + cancelFunc: cancelFunc, + client: cli, + cache: cache, + config: config, + namespace: factory.Namespace(), + nodeName: nodeName, + } + + s.kubeClient, err = factory.KubeClient() + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create kube client") + } + + s.dataPathMgr = datapath.NewManager(1) + + return s, nil +} + +var funcCreateDataPathRestore = (*podVolumeRestore).createDataPathService + +func (s *podVolumeRestore) run() { + signals.CancelOnShutdown(s.cancelFunc, s.logger) + go func() { + if err := s.cache.Start(s.ctx); err != nil { + s.logger.WithError(err).Warn("error starting cache") + } + }() + + s.runDataPath() +} + +func (s *podVolumeRestore) runDataPath() { + s.logger.Infof("Starting micro service in node %s for PVR %s", s.nodeName, s.config.pvrName) + + dpService, err := funcCreateDataPathRestore(s) + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to create data path service for PVR %s: %v", s.config.pvrName, err) + return + } + + s.logger.Infof("Starting data path service %s", s.config.pvrName) + + err = dpService.Init() + if err != nil { + dpService.Shutdown() + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to init data path service for PVR %s: %v", s.config.pvrName, err) + return + } + + result, err := dpService.RunCancelableDataPath(s.ctx) + if err != nil { + dpService.Shutdown() + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to run data path service for PVR %s: %v", s.config.pvrName, err) + return + } + + s.logger.WithField("PVR", s.config.pvrName).Info("Data path service completed") + + dpService.Shutdown() + + s.logger.WithField("PVR", s.config.pvrName).Info("Data path service is shut down") + + s.cancelFunc() + + funcExitWithMessage(s.logger, true, result) +} + +func (s *podVolumeRestore) createDataPathService() (dataPathService, error) { + credentialFileStore, err := funcNewCredentialFileStore( + s.client, + s.namespace, + credentials.DefaultStoreDirectory(), + filesystem.NewFileSystem(), + ) + if err != nil { + return nil, errors.Wrapf(err, "error to create credential file store") + } + + credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace) + if err != nil { + return nil, errors.Wrapf(err, "error to create credential secret store") + } + + credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} + + pvrInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return nil, errors.Wrap(err, "error to get controller-runtime informer from manager") + } + + repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout) + + return podvolume.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.pvrName, s.namespace, s.nodeName, datapath.AccessPoint{ + ByPath: s.config.volumePath, + VolMode: uploader.PersistentVolumeFilesystem, + }, s.dataPathMgr, repoEnsurer, credGetter, pvrInformer, s.logger), nil +} diff --git a/pkg/podvolume/pvr_micro_service.go b/pkg/podvolume/pvr_micro_service.go new file mode 100644 index 000000000..146c73992 --- /dev/null +++ b/pkg/podvolume/pvr_micro_service.go @@ -0,0 +1,334 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podvolume + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + cachetool "k8s.io/client-go/tools/cache" +) + +// RestoreMicroService process data mover restores inside the restore pod +type RestoreMicroService struct { + ctx context.Context + client client.Client + kubeClient kubernetes.Interface + repoEnsurer *repository.Ensurer + credentialGetter *credentials.CredentialGetter + logger logrus.FieldLogger + dataPathMgr *datapath.Manager + eventRecorder kube.EventRecorder + + namespace string + pvrName string + pvr *velerov1api.PodVolumeRestore + sourceTargetPath datapath.AccessPoint + + resultSignal chan dataPathResult + + ddInformer cache.Informer + pvrHandler cachetool.ResourceEventHandlerRegistration + nodeName string +} + +func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvrName string, namespace string, nodeName string, + sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter, + ddInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService { + return &RestoreMicroService{ + ctx: ctx, + client: client, + kubeClient: kubeClient, + credentialGetter: cred, + logger: log, + repoEnsurer: repoEnsurer, + dataPathMgr: dataPathMgr, + namespace: namespace, + pvrName: pvrName, + sourceTargetPath: sourceTargetPath, + nodeName: nodeName, + resultSignal: make(chan dataPathResult), + ddInformer: ddInformer, + } +} + +func (r *RestoreMicroService) Init() error { + r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvrName, r.nodeName, r.logger) + + handler, err := r.ddInformer.AddEventHandler( + cachetool.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj any, newObj any) { + oldPvr := oldObj.(*velerov1api.PodVolumeRestore) + newPvr := newObj.(*velerov1api.PodVolumeRestore) + + if newPvr.Name != r.pvrName { + return + } + + if newPvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress { + return + } + + if newPvr.Spec.Cancel && !oldPvr.Spec.Cancel { + r.cancelPodVolumeRestore(newPvr) + } + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error adding PVR handler") + } + + r.pvrHandler = handler + + return err +} + +func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { + log := r.logger.WithFields(logrus.Fields{ + "PVR": r.pvrName, + }) + + pvr := &velerov1api.PodVolumeRestore{} + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { + err := r.client.Get(ctx, types.NamespacedName{ + Namespace: r.namespace, + Name: r.pvrName, + }, pvr) + if apierrors.IsNotFound(err) { + return false, nil + } + + if err != nil { + return true, errors.Wrapf(err, "error to get PVR %s", r.pvrName) + } + + if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress { + return true, nil + } else { + return false, nil + } + }) + if err != nil { + log.WithError(err).Error("Failed to wait PVR") + return "", errors.Wrap(err, "error waiting for PVR") + } + + r.pvr = pvr + + log.Info("Run cancelable PVR") + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnPvrCompleted, + OnFailed: r.OnPvrFailed, + OnCancelled: r.OnPvrCancelled, + OnProgress: r.OnPvrProgress, + } + + fsRestore, err := r.dataPathMgr.CreateFileSystemBR(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log) + if err != nil { + return "", errors.Wrap(err, "error to create data path") + } + + log.Debug("Found volume path") + if err := fsRestore.Init(ctx, + &datapath.FSBRInitParam{ + BSLName: pvr.Spec.BackupStorageLocation, + SourceNamespace: pvr.Spec.SourceNamespace, + UploaderType: pvr.Spec.UploaderType, + RepositoryType: velerov1api.BackupRepositoryTypeKopia, + RepoIdentifier: "", + RepositoryEnsurer: r.repoEnsurer, + CredentialGetter: r.credentialGetter, + }); err != nil { + return "", errors.Wrap(err, "error to initialize data path") + } + log.Info("fs init") + + if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, r.sourceTargetPath, pvr.Spec.UploaderSettings); err != nil { + return "", errors.Wrap(err, "error starting data path restore") + } + + log.Info("Async fs restore data path started") + r.eventRecorder.Event(pvr, false, datapath.EventReasonStarted, "Data path for %s started", pvr.Name) + + result := "" + select { + case <-ctx.Done(): + err = errors.New("timed out waiting for fs restore to complete") + break + case res := <-r.resultSignal: + err = res.err + result = res.result + break + } + + if err != nil { + log.WithError(err).Error("Async fs restore was not completed") + } + + r.eventRecorder.EndingEvent(pvr, false, datapath.EventReasonStopped, "Data path for %s stopped", pvr.Name) + + return result, err +} + +func (r *RestoreMicroService) Shutdown() { + r.eventRecorder.Shutdown() + r.closeDataPath(r.ctx, r.pvrName) + + if r.pvrHandler != nil { + if err := r.ddInformer.RemoveEventHandler(r.pvrHandler); err != nil { + r.logger.WithError(err).Warn("Failed to remove pod handler") + } + } +} + +func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { + log := r.logger.WithField("PVR", pvrName) + + volumePath := result.Restore.Target.ByPath + if volumePath == "" { + r.recordPvrFailed(pvrName, "invalid restore target", errors.New("path is empty")) + return + } + + // Remove the .velero directory from the restored volume (it may contain done files from previous restores + // of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since + // this is non-essential cleanup (the done files are named based on restore UID and the init container looks + // for the one specific to the restore being executed). + if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil { + log.WithError(err).Warnf("error removing .velero directory from directory %s", volumePath) + } + + var restoreUID types.UID + for _, owner := range r.pvr.OwnerReferences { + if boolptr.IsSetToTrue(owner.Controller) { + restoreUID = owner.UID + break + } + } + + // Create the .velero directory within the volume dir so we can write a done file + // for this restore. + if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil { + r.recordPvrFailed(pvrName, "error creating .velero directory for done file", err) + return + } + + // Write a done file with name= into the just-created .velero dir + // within the volume. The velero init container on the pod is waiting + // for this file to exist in each restored volume before completing. + if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check. + r.recordPvrFailed(pvrName, "error writing done file", err) + return + } + + restoreBytes, err := funcMarshal(result.Restore) + if err != nil { + log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore) + r.recordPvrFailed(pvrName, fmt.Sprintf("error marshaling restore result %v", result.Restore), err) + } else { + r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCompleted, string(restoreBytes)) + r.resultSignal <- dataPathResult{ + result: string(restoreBytes), + } + } + + log.Info("Async fs restore data path completed") +} + +func (r *RestoreMicroService) recordPvrFailed(pvrName string, msg string, err error) { + evtMsg := fmt.Sprintf("%s, error %v", msg, err) + r.eventRecorder.Event(r.pvr, false, datapath.EventReasonFailed, evtMsg) + r.resultSignal <- dataPathResult{ + err: errors.Wrapf(err, msg, pvrName), + } +} + +func (r *RestoreMicroService) OnPvrFailed(ctx context.Context, namespace string, pvrName string, err error) { + log := r.logger.WithField("PVR", pvrName) + log.WithError(err).Error("Async fs restore data path failed") + + r.recordPvrFailed(pvrName, fmt.Sprintf("Data path for PVR %s failed", pvrName), err) +} + +func (r *RestoreMicroService) OnPvrCancelled(ctx context.Context, namespace string, pvrName string) { + log := r.logger.WithField("PVR", pvrName) + log.Warn("Async fs restore data path canceled") + + r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCancelled, "Data path for PVR %s canceled", pvrName) + r.resultSignal <- dataPathResult{ + err: errors.New(datapath.ErrCancelled), + } +} + +func (r *RestoreMicroService) OnPvrProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) { + log := r.logger.WithFields(logrus.Fields{ + "PVR": pvrName, + }) + + progressBytes, err := funcMarshal(progress) + if err != nil { + log.WithError(err).Errorf("Failed to marshal progress %v", progress) + return + } + + r.eventRecorder.Event(r.pvr, false, datapath.EventReasonProgress, string(progressBytes)) +} + +func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) { + fsRestore := r.dataPathMgr.GetAsyncBR(ddName) + if fsRestore != nil { + fsRestore.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(ddName) +} + +func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeRestore) { + r.logger.WithField("PVR", pvr.Name).Info("PVR is being canceled") + + r.eventRecorder.Event(pvr, false, datapath.EventReasonCancelling, "Canceling for PVR %s", pvr.Name) + + fsBackup := r.dataPathMgr.GetAsyncBR(pvr.Name) + if fsBackup == nil { + r.OnPvrCancelled(r.ctx, pvr.GetNamespace(), pvr.GetName()) + } else { + fsBackup.Cancel() + } +}