diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 513eb89de..8aa52b1c5 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -61,6 +61,8 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + cacheutil "k8s.io/client-go/tools/cache" ) var ( @@ -309,14 +311,17 @@ func (s *nodeAgentServer) run() { } go func() { - s.mgr.GetCache().WaitForCacheSync(s.ctx) - - if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") + if err := s.waitCacheForResume(); err != nil { + s.logger.WithError(err).Error("Failed to wait cache for resume, will not resume DU/DD") + return } - if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") + if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data upload resume") + } + + if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume") } }() @@ -327,6 +332,29 @@ func (s *nodeAgentServer) run() { } } +func (s *nodeAgentServer) waitCacheForResume() error { + podInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &v1.Pod{}) + if err != nil { + return errors.Wrap(err, "error getting pod informer") + } + + duInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataUpload{}) + if err != nil { + return errors.Wrap(err, "error getting du informer") + } + + ddInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return errors.Wrap(err, "error getting dd informer") + } + + if !cacheutil.WaitForCacheSync(s.ctx.Done(), podInformer.HasSynced, duInformer.HasSynced, ddInformer.HasSynced) { + return errors.New("error waiting informer synced") + } + + return nil +} + // validatePodVolumesHostPath validates that the pod volumes path contains a // directory for each Pod running on this node func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 0b9805a0a..990d7455b 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -767,9 +767,9 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath -func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataDownloads := &velerov2alpha1api.DataDownloadList{} - if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads") return errors.Wrapf(err, "error to list datadownloads") } @@ -795,7 +795,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it") resumeErr := err - err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + err = UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false @@ -812,7 +812,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase") - err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index bb9fe6f7c..a54135d03 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -1079,7 +1079,7 @@ func TestAttemptDataDownloadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace) + err = r.AttemptDataDownloadResume(ctx, r.logger.WithField("name", test.name), test.dd.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 4c26baf38..22dc59bd5 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -867,9 +867,9 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath -func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataUploads := &velerov2alpha1api.DataUploadList{} - if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") return errors.Wrapf(err, "error to list datauploads") } @@ -895,7 +895,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it") resumeErr := err - err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + err = UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false @@ -912,7 +912,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") - err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index b373a7a6a..a6ee25574 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -1127,7 +1127,7 @@ func TestAttemptDataUploadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + err = r.AttemptDataUploadResume(ctx, r.logger.WithField("name", test.name), test.du.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError)