vgdp ms pvr data path

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2025-06-03 14:35:27 +08:00
parent d2c6b6bc3e
commit b54404fc56
3 changed files with 622 additions and 0 deletions

View File

@@ -54,6 +54,10 @@ type PodVolumeRestoreSpec struct {
// +optional
// +nullable
UploaderSettings map[string]string `json:"uploaderSettings,omitempty"`
// Cancel indicates request to cancel the ongoing PodVolumeRestore. It can be set
// when the PodVolumeRestore is in InProgress phase
Cancel bool `json:"cancel,omitempty"`
}
// PodVolumeRestorePhase represents the lifecycle phase of a PodVolumeRestore.

View File

@@ -0,0 +1,284 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/bombsimon/logrusr/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type pvrConfig struct {
volumePath string
pvrName string
resourceTimeout time.Duration
winHPC bool
}
func NewRestoreCommand(f client.Factory) *cobra.Command {
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
formatFlag := logging.NewFormatFlag()
config := pvrConfig{}
command := &cobra.Command{
Use: "restore",
Short: "Run the velero pod volume restore",
Long: "Run the velero pod volume restore",
Hidden: true,
Run: func(c *cobra.Command, args []string) {
logLevel := logLevelFlag.Parse()
logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String()))
logger := logging.DefaultLogger(logLevel, formatFlag.Parse())
logger.Infof("Starting Velero pod volume restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s, err := newPodVolumeRestore(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create pod volume restore, %v", err)
}
s.run()
},
}
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
command.Flags().StringVar(&config.volumePath, "volume-path", config.volumePath, "The full path of the volume to be restored")
command.Flags().StringVar(&config.pvrName, "pod-volume-restore", config.pvrName, "The PVR name")
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters.")
_ = command.MarkFlagRequired("volume-path")
_ = command.MarkFlagRequired("pod-volume-restore")
_ = command.MarkFlagRequired("resource-timeout")
return command
}
type podVolumeRestore struct {
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config pvrConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}
func newPodVolumeRestore(logger logrus.FieldLogger, factory client.Factory, config pvrConfig) (*podVolumeRestore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")
}
ctrl.SetLogger(logrusr.New(logger))
klog.SetLogger(logrusr.New(logger)) // klog.Logger is used by k8s.io/client-go
scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")
}
if err := corev1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")
}
nodeName := os.Getenv("NODE_NAME")
// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&corev1api.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov1api.PodVolumeRestore{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}
cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")
}
var cache ctlcache.Cache
retry := 10
for {
cache, err = ctlcache.New(clientConfig, cacheOption)
if err == nil {
break
}
retry--
if retry == 0 {
break
}
logger.WithError(err).Warn("Failed to create client cache, need retry")
time.Sleep(time.Second)
}
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
}
s := &podVolumeRestore{
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}
s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")
}
s.dataPathMgr = datapath.NewManager(1)
return s, nil
}
var funcCreateDataPathRestore = (*podVolumeRestore).createDataPathService
func (s *podVolumeRestore) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")
}
}()
s.runDataPath()
}
func (s *podVolumeRestore) runDataPath() {
s.logger.Infof("Starting micro service in node %s for PVR %s", s.nodeName, s.config.pvrName)
dpService, err := funcCreateDataPathRestore(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for PVR %s: %v", s.config.pvrName, err)
return
}
s.logger.Infof("Starting data path service %s", s.config.pvrName)
err = dpService.Init()
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for PVR %s: %v", s.config.pvrName, err)
return
}
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for PVR %s: %v", s.config.pvrName, err)
return
}
s.logger.WithField("PVR", s.config.pvrName).Info("Data path service completed")
dpService.Shutdown()
s.logger.WithField("PVR", s.config.pvrName).Info("Data path service is shut down")
s.cancelFunc()
funcExitWithMessage(s.logger, true, result)
}
func (s *podVolumeRestore) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
credentials.DefaultStoreDirectory(),
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}
credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
pvrInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}
repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)
return podvolume.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.pvrName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeFilesystem,
}, s.dataPathMgr, repoEnsurer, credGetter, pvrInformer, s.logger), nil
}

View File

@@ -0,0 +1,334 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podvolume
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/kube"
cachetool "k8s.io/client-go/tools/cache"
)
// RestoreMicroService process data mover restores inside the restore pod
type RestoreMicroService struct {
ctx context.Context
client client.Client
kubeClient kubernetes.Interface
repoEnsurer *repository.Ensurer
credentialGetter *credentials.CredentialGetter
logger logrus.FieldLogger
dataPathMgr *datapath.Manager
eventRecorder kube.EventRecorder
namespace string
pvrName string
pvr *velerov1api.PodVolumeRestore
sourceTargetPath datapath.AccessPoint
resultSignal chan dataPathResult
ddInformer cache.Informer
pvrHandler cachetool.ResourceEventHandlerRegistration
nodeName string
}
func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, pvrName string, namespace string, nodeName string,
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
ddInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService {
return &RestoreMicroService{
ctx: ctx,
client: client,
kubeClient: kubeClient,
credentialGetter: cred,
logger: log,
repoEnsurer: repoEnsurer,
dataPathMgr: dataPathMgr,
namespace: namespace,
pvrName: pvrName,
sourceTargetPath: sourceTargetPath,
nodeName: nodeName,
resultSignal: make(chan dataPathResult),
ddInformer: ddInformer,
}
}
func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.pvrName, r.nodeName, r.logger)
handler, err := r.ddInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj any, newObj any) {
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
newPvr := newObj.(*velerov1api.PodVolumeRestore)
if newPvr.Name != r.pvrName {
return
}
if newPvr.Status.Phase != velerov1api.PodVolumeRestorePhaseInProgress {
return
}
if newPvr.Spec.Cancel && !oldPvr.Spec.Cancel {
r.cancelPodVolumeRestore(newPvr)
}
},
},
)
if err != nil {
return errors.Wrap(err, "error adding PVR handler")
}
r.pvrHandler = handler
return err
}
func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
"PVR": r.pvrName,
})
pvr := &velerov1api.PodVolumeRestore{}
err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.pvrName,
}, pvr)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return true, errors.Wrapf(err, "error to get PVR %s", r.pvrName)
}
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
return true, nil
} else {
return false, nil
}
})
if err != nil {
log.WithError(err).Error("Failed to wait PVR")
return "", errors.Wrap(err, "error waiting for PVR")
}
r.pvr = pvr
log.Info("Run cancelable PVR")
callbacks := datapath.Callbacks{
OnCompleted: r.OnPvrCompleted,
OnFailed: r.OnPvrFailed,
OnCancelled: r.OnPvrCancelled,
OnProgress: r.OnPvrProgress,
}
fsRestore, err := r.dataPathMgr.CreateFileSystemBR(pvr.Name, podVolumeRequestor, ctx, r.client, pvr.Namespace, callbacks, log)
if err != nil {
return "", errors.Wrap(err, "error to create data path")
}
log.Debug("Found volume path")
if err := fsRestore.Init(ctx,
&datapath.FSBRInitParam{
BSLName: pvr.Spec.BackupStorageLocation,
SourceNamespace: pvr.Spec.SourceNamespace,
UploaderType: pvr.Spec.UploaderType,
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return "", errors.Wrap(err, "error to initialize data path")
}
log.Info("fs init")
if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, r.sourceTargetPath, pvr.Spec.UploaderSettings); err != nil {
return "", errors.Wrap(err, "error starting data path restore")
}
log.Info("Async fs restore data path started")
r.eventRecorder.Event(pvr, false, datapath.EventReasonStarted, "Data path for %s started", pvr.Name)
result := ""
select {
case <-ctx.Done():
err = errors.New("timed out waiting for fs restore to complete")
break
case res := <-r.resultSignal:
err = res.err
result = res.result
break
}
if err != nil {
log.WithError(err).Error("Async fs restore was not completed")
}
r.eventRecorder.EndingEvent(pvr, false, datapath.EventReasonStopped, "Data path for %s stopped", pvr.Name)
return result, err
}
func (r *RestoreMicroService) Shutdown() {
r.eventRecorder.Shutdown()
r.closeDataPath(r.ctx, r.pvrName)
if r.pvrHandler != nil {
if err := r.ddInformer.RemoveEventHandler(r.pvrHandler); err != nil {
r.logger.WithError(err).Warn("Failed to remove pod handler")
}
}
}
func (r *RestoreMicroService) OnPvrCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) {
log := r.logger.WithField("PVR", pvrName)
volumePath := result.Restore.Target.ByPath
if volumePath == "" {
r.recordPvrFailed(pvrName, "invalid restore target", errors.New("path is empty"))
return
}
// Remove the .velero directory from the restored volume (it may contain done files from previous restores
// of this volume, which we don't want to carry over). If this fails for any reason, log and continue, since
// this is non-essential cleanup (the done files are named based on restore UID and the init container looks
// for the one specific to the restore being executed).
if err := os.RemoveAll(filepath.Join(volumePath, ".velero")); err != nil {
log.WithError(err).Warnf("error removing .velero directory from directory %s", volumePath)
}
var restoreUID types.UID
for _, owner := range r.pvr.OwnerReferences {
if boolptr.IsSetToTrue(owner.Controller) {
restoreUID = owner.UID
break
}
}
// Create the .velero directory within the volume dir so we can write a done file
// for this restore.
if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil {
r.recordPvrFailed(pvrName, "error creating .velero directory for done file", err)
return
}
// Write a done file with name=<restore-uid> into the just-created .velero dir
// within the volume. The velero init container on the pod is waiting
// for this file to exist in each restored volume before completing.
if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec // Internal usage. No need to check.
r.recordPvrFailed(pvrName, "error writing done file", err)
return
}
restoreBytes, err := funcMarshal(result.Restore)
if err != nil {
log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore)
r.recordPvrFailed(pvrName, fmt.Sprintf("error marshaling restore result %v", result.Restore), err)
} else {
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCompleted, string(restoreBytes))
r.resultSignal <- dataPathResult{
result: string(restoreBytes),
}
}
log.Info("Async fs restore data path completed")
}
func (r *RestoreMicroService) recordPvrFailed(pvrName string, msg string, err error) {
evtMsg := fmt.Sprintf("%s, error %v", msg, err)
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonFailed, evtMsg)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, msg, pvrName),
}
}
func (r *RestoreMicroService) OnPvrFailed(ctx context.Context, namespace string, pvrName string, err error) {
log := r.logger.WithField("PVR", pvrName)
log.WithError(err).Error("Async fs restore data path failed")
r.recordPvrFailed(pvrName, fmt.Sprintf("Data path for PVR %s failed", pvrName), err)
}
func (r *RestoreMicroService) OnPvrCancelled(ctx context.Context, namespace string, pvrName string) {
log := r.logger.WithField("PVR", pvrName)
log.Warn("Async fs restore data path canceled")
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonCancelled, "Data path for PVR %s canceled", pvrName)
r.resultSignal <- dataPathResult{
err: errors.New(datapath.ErrCancelled),
}
}
func (r *RestoreMicroService) OnPvrProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) {
log := r.logger.WithFields(logrus.Fields{
"PVR": pvrName,
})
progressBytes, err := funcMarshal(progress)
if err != nil {
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
return
}
r.eventRecorder.Event(r.pvr, false, datapath.EventReasonProgress, string(progressBytes))
}
func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) {
fsRestore := r.dataPathMgr.GetAsyncBR(ddName)
if fsRestore != nil {
fsRestore.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(ddName)
}
func (r *RestoreMicroService) cancelPodVolumeRestore(pvr *velerov1api.PodVolumeRestore) {
r.logger.WithField("PVR", pvr.Name).Info("PVR is being canceled")
r.eventRecorder.Event(pvr, false, datapath.EventReasonCancelling, "Canceling for PVR %s", pvr.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(pvr.Name)
if fsBackup == nil {
r.OnPvrCancelled(r.ctx, pvr.GetNamespace(), pvr.GetName())
} else {
fsBackup.Cancel()
}
}