From 627e2fede6ff364dc5736e1351174b825cd7b581 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 21 Aug 2024 19:21:07 +0800 Subject: [PATCH] nod-agent config for pod resources Signed-off-by: Lyndon-Li --- changelogs/unreleased/8143-Lyndon-Li | 1 + .../vgdp-micro-service/vgdp-micro-service.md | 22 +++++++++++++++++++ pkg/cmd/cli/nodeagent/server.go | 17 ++++++++++++-- pkg/controller/data_download_controller.go | 6 +++-- .../data_download_controller_test.go | 6 ++--- pkg/controller/data_upload_controller.go | 7 ++++-- pkg/controller/data_upload_controller_test.go | 2 +- pkg/exposer/csi_snapshot.go | 8 +++++-- pkg/exposer/generic_restore.go | 9 ++++---- pkg/exposer/generic_restore_test.go | 2 +- pkg/exposer/mocks/generic_restore.go | 10 ++++----- pkg/nodeagent/node_agent.go | 10 +++++++++ 12 files changed, 78 insertions(+), 22 deletions(-) create mode 100644 changelogs/unreleased/8143-Lyndon-Li diff --git a/changelogs/unreleased/8143-Lyndon-Li b/changelogs/unreleased/8143-Lyndon-Li new file mode 100644 index 000000000..17bdec7eb --- /dev/null +++ b/changelogs/unreleased/8143-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8134, allow to config resource request/limit for data mover micro service pods \ No newline at end of file diff --git a/design/vgdp-micro-service/vgdp-micro-service.md b/design/vgdp-micro-service/vgdp-micro-service.md index 8d777e17e..c6d1e117e 100644 --- a/design/vgdp-micro-service/vgdp-micro-service.md +++ b/design/vgdp-micro-service/vgdp-micro-service.md @@ -176,6 +176,27 @@ Below diagram shows how VGDP logs are redirected: This log redirecting mechanism is thread safe since the hook acquires the write lock before writing the log buffer, so it guarantees that in the node-agent log there is no corruptions after redirecting the log, and the redirected logs and the original node-agent logs are not projected into each other. +### Resource Control +The CPU/memory resource of backupPod/restorePod is configurable, which means users are allowed to configure resources per volume backup/restore. +By default, the [Best Effort policy][5] is used, and users are allowed to change it through the ```node-agent-config``` configMap. Specifically, we add below structures to the configMap: +``` +type Configs struct { + // PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods. + PodResources *PodResources `json:"podResources,omitempty"` +} + +type PodResources struct { + CPURequest string `json:"cpuRequest,omitempty"` + MemoryRequest string `json:"memoryRequest,omitempty"` + CPULimit string `json:"cpuLimit,omitempty"` + MemoryLimit string `json:"memoryLimit,omitempty"` +} +``` +The string values must mactch Kubernetes Quantity expressions; for each resource, the "request" value must not be larger than the "limit" value. Otherwise, if any one of the values fail, all the resource configurations will be ignored. + +The configurations are loaded node-agent at start time, so users can change the values in the configMap any time, but the changes won't effect until node-agent restarts. + + ## node-agent node-agent is still required. Even though VGDP is now not running inside node-agent, node-agent still hosts the data mover controller which reconciles DUCR/DDCR and operates DUCR/DDCR in other steps before the VGDP instance is started, i.e., Accept, Expose, etc. Privileged mode and root user are not required for node-agent anymore by Volume Snapshot Data Movement, however, they are still required by PVB(PodVolumeBackup) and PVR(PodVolumeRestore). Therefore, we will keep the node-agent deamonset as is, for any users who don't use PVB/PVR and have concern about the privileged mode/root user, they need to manually modify the deamonset spec to remove the dependencies. @@ -198,4 +219,5 @@ CLI is not changed. [2]: ../volume-snapshot-data-movement/volume-snapshot-data-movement.md [3]: https://kubernetes.io/blog/2022/09/02/cosi-kubernetes-object-storage-management/ [4]: ../Implemented/node-agent-concurrency.md +[5]: https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/ diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 20a4654fb..7365c4e8c 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -60,6 +60,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/util/logging" cacheutil "k8s.io/client-go/tools/cache" @@ -295,19 +296,31 @@ func (s *nodeAgentServer) run() { var loadAffinity *nodeagent.LoadAffinity if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] + s.logger.Infof("Using customized loadAffinity %v", loadAffinity) } var backupPVCConfig map[string]nodeagent.BackupPVC if s.dataPathConfigs != nil && s.dataPathConfigs.BackupPVCConfig != nil { backupPVCConfig = s.dataPathConfigs.BackupPVCConfig + s.logger.Infof("Using customized backupPVC config %v", backupPVCConfig) } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + podResources := v1.ResourceRequirements{} + if s.dataPathConfigs != nil && s.dataPathConfigs.PodResources != nil { + if res, err := kube.ParseResourceRequirements(s.dataPathConfigs.PodResources.CPURequest, s.dataPathConfigs.PodResources.MemoryRequest, s.dataPathConfigs.PodResources.CPULimit, s.dataPathConfigs.PodResources.MemoryLimit); err != nil { + s.logger.WithError(err).Warn("Pod resource requirements are invalid, ignore") + } else { + podResources = res + s.logger.Infof("Using customized pod resource requirements %v", s.dataPathConfigs.PodResources) + } + } + + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, podResources, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 990d7455b..7d6f6e0c6 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -60,12 +60,13 @@ type DataDownloadReconciler struct { restoreExposer exposer.GenericRestoreExposer nodeName string dataPathMgr *datapath.Manager + podResources v1.ResourceRequirements preparingTimeout time.Duration metrics *metrics.ServerMetrics } func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { + podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, kubeClient: kubeClient, @@ -75,6 +76,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl nodeName: nodeName, restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), dataPathMgr: dataPathMgr, + podResources: podResources, preparingTimeout: preparingTimeout, metrics: metrics, } @@ -179,7 +181,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. - err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration) + err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration) if err != nil { if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { if !apierrors.IsNotFound(err) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index a54135d03..a675b73cd 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -140,7 +140,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -441,7 +441,7 @@ func TestDataDownloadReconcile(t *testing.T) { r.restoreExposer = func() exposer.GenericRestoreExposer { ep := exposermockes.NewGenericRestoreExposer(t) if test.isExposeErr { - ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer")) + ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer")) } else if test.notNilExpose { hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1.Volume{Name: "test-pvc"}).Result() hostingPod.ObjectMeta.SetUID("test-uid") @@ -959,7 +959,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler, return dt.resumeErr } -func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error { +func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error { return nil } diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 22dc59bd5..5b3169621 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -73,13 +73,14 @@ type DataUploadReconciler struct { dataPathMgr *datapath.Manager loadAffinity *nodeagent.LoadAffinity backupPVCConfig map[string]nodeagent.BackupPVC + podResources corev1.ResourceRequirements preparingTimeout time.Duration metrics *metrics.ServerMetrics } func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, clock clocks.WithTickerAndDelayedExecution, - nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { + dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, podResources corev1.ResourceRequirements, + clock clocks.WithTickerAndDelayedExecution, nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, mgr: mgr, @@ -92,6 +93,7 @@ func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClie dataPathMgr: dataPathMgr, loadAffinity: loadAffinity, backupPVCConfig: backupPVCConfig, + podResources: podResources, preparingTimeout: preparingTimeout, metrics: metrics, } @@ -795,6 +797,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload VolumeSize: pvc.Spec.Resources.Requests[corev1.ResourceStorage], Affinity: r.loadAffinity, BackupPVCConfig: r.backupPVCConfig, + Resources: r.podResources, }, nil } return nil, nil diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index a6ee25574..5de53c100 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -232,7 +232,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet) return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{}, - testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + corev1.ResourceRequirements{}, testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index ccdb70dd2..630a8aad8 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -70,6 +70,9 @@ type CSISnapshotExposeParam struct { // BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement BackupPVCConfig map[string]nodeagent.BackupPVC + + // Resources defines the resource requirements of the hosting pod + Resources corev1.ResourceRequirements } // CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots @@ -191,7 +194,7 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje } }() - backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity) + backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity, csiExposeParam.Resources) if err != nil { return errors.Wrap(err, "error to create backup pod") } @@ -423,7 +426,7 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co } func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, operationTimeout time.Duration, - label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) { + label map[string]string, affinity *nodeagent.LoadAffinity, resources corev1.ResourceRequirements) (*corev1.Pod, error) { podName := ownerObject.Name containerName := string(ownerObject.UID) @@ -513,6 +516,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co VolumeMounts: volumeMounts, VolumeDevices: volumeDevices, Env: podInfo.env, + Resources: resources, }, }, ServiceAccountName: podInfo.serviceAccount, diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index da8ad9b8f..b5c927602 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -37,7 +37,7 @@ import ( // GenericRestoreExposer is the interfaces for a generic restore exposer type GenericRestoreExposer interface { // Expose starts the process to a restore expose, the expose process may take long time - Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error + Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error // GetExposed polls the status of the expose. // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. @@ -69,7 +69,7 @@ type genericRestoreExposer struct { log logrus.FieldLogger } -func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, timeout time.Duration) error { +func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error { curLog := e.log.WithFields(logrus.Fields{ "owner": ownerObject.Name, "target PVC": targetPVCName, @@ -87,7 +87,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName) } - restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode) + restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources) if err != nil { return errors.Wrapf(err, "error to create restore pod") } @@ -297,7 +297,7 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co } func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim, - operationTimeout time.Duration, label map[string]string, selectedNode string) (*corev1.Pod, error) { + operationTimeout time.Duration, label map[string]string, selectedNode string, resources corev1.ResourceRequirements) (*corev1.Pod, error) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name @@ -370,6 +370,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec VolumeMounts: volumeMounts, VolumeDevices: volumeDevices, Env: podInfo.env, + Resources: resources, }, }, ServiceAccountName: podInfo.serviceAccount, diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index 02fa85728..4c3221b5c 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -180,7 +180,7 @@ func TestRestoreExpose(t *testing.T) { } } - err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, time.Millisecond) + err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond) assert.EqualError(t, err, test.err) }) } diff --git a/pkg/exposer/mocks/generic_restore.go b/pkg/exposer/mocks/generic_restore.go index 6981f5cef..e0b76d6e7 100644 --- a/pkg/exposer/mocks/generic_restore.go +++ b/pkg/exposer/mocks/generic_restore.go @@ -26,17 +26,17 @@ func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectRefer _m.Called(_a0, _a1) } -// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5 -func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5) +// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6 +func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 v1.ResourceRequirements, _a6 time.Duration) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6) if len(ret) == 0 { panic("no return value specified for Expose") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, v1.ResourceRequirements, time.Duration) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6) } else { r0 = ret.Error(0) } diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 78cdc2771..f7a9629dc 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -70,6 +70,13 @@ type BackupPVC struct { ReadOnly bool `json:"readOnly,omitempty"` } +type PodResources struct { + CPURequest string `json:"cpuRequest,omitempty"` + MemoryRequest string `json:"memoryRequest,omitempty"` + CPULimit string `json:"cpuLimit,omitempty"` + MemoryLimit string `json:"memoryLimit,omitempty"` +} + type Configs struct { // LoadConcurrency is the config for data path load concurrency per node. LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"` @@ -79,6 +86,9 @@ type Configs struct { // BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"` + + // PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods. + PodResources *PodResources `json:"podResources,omitempty"` } // IsRunning checks if the node agent daemonset is running properly. If not, return the error found