handle velero server and node-agent restarts for vgdp ms for pvb

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2025-06-19 16:22:44 +08:00
parent 2fb9fbc4b4
commit d73cef3b94
4 changed files with 18 additions and 105 deletions

View File

@@ -911,10 +911,10 @@ func UpdatePVRWithRetry(ctx context.Context, client client.Client, namespacedNam
var funcResumeCancellablePVR = (*PodVolumeRestoreReconciler).resumeCancellableDataPath
func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logger *logrus.Entry, ns string) error {
func (r *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logger *logrus.Entry, ns string) error {
pvrs := &velerov1api.PodVolumeRestoreList{}
if err := c.client.List(ctx, pvrs, &client.ListOptions{Namespace: ns}); err != nil {
c.logger.WithError(errors.WithStack(err)).Error("failed to list PVRs")
if err := r.client.List(ctx, pvrs, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list PVRs")
return errors.Wrapf(err, "error to list PVRs")
}
@@ -925,21 +925,21 @@ func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logge
}
if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
if pvr.Status.Node != c.nodeName {
logger.WithField("PVR", pvr.Name).WithField("current node", c.nodeName).Infof("PVR should be resumed by another node %s", pvr.Status.Node)
if pvr.Status.Node != r.nodeName {
logger.WithField("PVR", pvr.Name).WithField("current node", r.nodeName).Infof("PVR should be resumed by another node %s", pvr.Status.Node)
continue
}
err := funcResumeCancellablePVR(c, ctx, pvr, logger)
err := funcResumeCancellablePVR(r, ctx, pvr, logger)
if err == nil {
logger.WithField("PVR", pvr.Name).WithField("current node", c.nodeName).Info("Completed to resume in progress PVR")
logger.WithField("PVR", pvr.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress PVR")
continue
}
logger.WithField("PVR", pvr.GetName()).WithError(err).Warn("Failed to resume data path for PVR, have to cancel it")
resumeErr := err
err = UpdatePVRWithRetry(ctx, c.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, logger.WithField("PVR", pvr.Name),
err = UpdatePVRWithRetry(ctx, r.client, types.NamespacedName{Namespace: pvr.Namespace, Name: pvr.Name}, logger.WithField("PVR", pvr.Name),
func(pvr *velerov1api.PodVolumeRestore) bool {
if pvr.Spec.Cancel {
return false
@@ -961,10 +961,10 @@ func (c *PodVolumeRestoreReconciler) AttemptPVRResume(ctx context.Context, logge
return nil
}
func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) error {
func (r *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) error {
log.Info("Resume cancelable PVR")
res, err := c.exposer.GetExposed(ctx, getPVROwnerObject(pvr), c.client, c.nodeName, c.resourceTimeout)
res, err := r.exposer.GetExposed(ctx, getPVROwnerObject(pvr), r.client, r.nodeName, r.resourceTimeout)
if err != nil {
return errors.Wrapf(err, "error to get exposed PVR %s", pvr.Name)
}
@@ -974,13 +974,13 @@ func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Conte
}
callbacks := datapath.Callbacks{
OnCompleted: c.OnDataPathCompleted,
OnFailed: c.OnDataPathFailed,
OnCancelled: c.OnDataPathCancelled,
OnProgress: c.OnDataPathProgress,
OnCompleted: r.OnDataPathCompleted,
OnFailed: r.OnDataPathFailed,
OnCancelled: r.OnDataPathCancelled,
OnProgress: r.OnDataPathProgress,
}
asyncBR, err := c.dataPathMgr.CreateMicroServiceBRWatcher(ctx, c.client, c.kubeClient, c.mgr, datapath.TaskTypeRestore, pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, true, log)
asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, true, log)
if err != nil {
return errors.Wrapf(err, "error to create asyncBR watcher for PVR %s", pvr.Name)
}
@@ -988,7 +988,7 @@ func (c *PodVolumeRestoreReconciler) resumeCancellableDataPath(ctx context.Conte
resumeComplete := false
defer func() {
if !resumeComplete {
c.closeDataPath(ctx, pvr.Name)
r.closeDataPath(ctx, pvr.Name)
}
}()

View File

@@ -1501,7 +1501,7 @@ func TestAttemptPVRResume(t *testing.T) {
r.client.Delete(ctx, test.pvr, &kbclient.DeleteOptions{})
}()
assert.NoError(t, r.client.Create(ctx, test.pvr))
require.NoError(t, r.client.Create(ctx, test.pvr))
dt := &pvbResumeTestHelper{
resumeErr: test.resumeErr,