mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 04:55:22 +00:00
nod-agent config for pod resources
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
1
changelogs/unreleased/8143-Lyndon-Li
Normal file
1
changelogs/unreleased/8143-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
Fix issue #8134, allow to config resource request/limit for data mover micro service pods
|
||||
@@ -176,6 +176,27 @@ Below diagram shows how VGDP logs are redirected:
|
||||
|
||||
This log redirecting mechanism is thread safe since the hook acquires the write lock before writing the log buffer, so it guarantees that in the node-agent log there is no corruptions after redirecting the log, and the redirected logs and the original node-agent logs are not projected into each other.
|
||||
|
||||
### Resource Control
|
||||
The CPU/memory resource of backupPod/restorePod is configurable, which means users are allowed to configure resources per volume backup/restore.
|
||||
By default, the [Best Effort policy][5] is used, and users are allowed to change it through the ```node-agent-config``` configMap. Specifically, we add below structures to the configMap:
|
||||
```
|
||||
type Configs struct {
|
||||
// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
|
||||
PodResources *PodResources `json:"podResources,omitempty"`
|
||||
}
|
||||
|
||||
type PodResources struct {
|
||||
CPURequest string `json:"cpuRequest,omitempty"`
|
||||
MemoryRequest string `json:"memoryRequest,omitempty"`
|
||||
CPULimit string `json:"cpuLimit,omitempty"`
|
||||
MemoryLimit string `json:"memoryLimit,omitempty"`
|
||||
}
|
||||
```
|
||||
The string values must mactch Kubernetes Quantity expressions; for each resource, the "request" value must not be larger than the "limit" value. Otherwise, if any one of the values fail, all the resource configurations will be ignored.
|
||||
|
||||
The configurations are loaded node-agent at start time, so users can change the values in the configMap any time, but the changes won't effect until node-agent restarts.
|
||||
|
||||
|
||||
## node-agent
|
||||
node-agent is still required. Even though VGDP is now not running inside node-agent, node-agent still hosts the data mover controller which reconciles DUCR/DDCR and operates DUCR/DDCR in other steps before the VGDP instance is started, i.e., Accept, Expose, etc.
|
||||
Privileged mode and root user are not required for node-agent anymore by Volume Snapshot Data Movement, however, they are still required by PVB(PodVolumeBackup) and PVR(PodVolumeRestore). Therefore, we will keep the node-agent deamonset as is, for any users who don't use PVB/PVR and have concern about the privileged mode/root user, they need to manually modify the deamonset spec to remove the dependencies.
|
||||
@@ -198,4 +219,5 @@ CLI is not changed.
|
||||
[2]: ../volume-snapshot-data-movement/volume-snapshot-data-movement.md
|
||||
[3]: https://kubernetes.io/blog/2022/09/02/cosi-kubernetes-object-storage-management/
|
||||
[4]: ../Implemented/node-agent-concurrency.md
|
||||
[5]: https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ import (
|
||||
"github.com/vmware-tanzu/velero/pkg/nodeagent"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/logging"
|
||||
|
||||
cacheutil "k8s.io/client-go/tools/cache"
|
||||
@@ -295,19 +296,31 @@ func (s *nodeAgentServer) run() {
|
||||
var loadAffinity *nodeagent.LoadAffinity
|
||||
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
|
||||
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
|
||||
s.logger.Infof("Using customized loadAffinity %v", loadAffinity)
|
||||
}
|
||||
|
||||
var backupPVCConfig map[string]nodeagent.BackupPVC
|
||||
if s.dataPathConfigs != nil && s.dataPathConfigs.BackupPVCConfig != nil {
|
||||
backupPVCConfig = s.dataPathConfigs.BackupPVCConfig
|
||||
s.logger.Infof("Using customized backupPVC config %v", backupPVCConfig)
|
||||
}
|
||||
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
podResources := v1.ResourceRequirements{}
|
||||
if s.dataPathConfigs != nil && s.dataPathConfigs.PodResources != nil {
|
||||
if res, err := kube.ParseResourceRequirements(s.dataPathConfigs.PodResources.CPURequest, s.dataPathConfigs.PodResources.MemoryRequest, s.dataPathConfigs.PodResources.CPULimit, s.dataPathConfigs.PodResources.MemoryLimit); err != nil {
|
||||
s.logger.WithError(err).Warn("Pod resource requirements are invalid, ignore")
|
||||
} else {
|
||||
podResources = res
|
||||
s.logger.Infof("Using customized pod resource requirements %v", s.dataPathConfigs.PodResources)
|
||||
}
|
||||
}
|
||||
|
||||
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, podResources, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
|
||||
}
|
||||
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
|
||||
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.WithError(err).Fatal("Unable to create the data download controller")
|
||||
}
|
||||
|
||||
@@ -60,12 +60,13 @@ type DataDownloadReconciler struct {
|
||||
restoreExposer exposer.GenericRestoreExposer
|
||||
nodeName string
|
||||
dataPathMgr *datapath.Manager
|
||||
podResources v1.ResourceRequirements
|
||||
preparingTimeout time.Duration
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
|
||||
nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
|
||||
podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
|
||||
return &DataDownloadReconciler{
|
||||
client: client,
|
||||
kubeClient: kubeClient,
|
||||
@@ -75,6 +76,7 @@ func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeCl
|
||||
nodeName: nodeName,
|
||||
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
|
||||
dataPathMgr: dataPathMgr,
|
||||
podResources: podResources,
|
||||
preparingTimeout: preparingTimeout,
|
||||
metrics: metrics,
|
||||
}
|
||||
@@ -179,7 +181,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
|
||||
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
|
||||
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
|
||||
// And then only the controller who is in the same node could do the rest work.
|
||||
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
|
||||
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration)
|
||||
if err != nil {
|
||||
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
|
||||
@@ -140,7 +140,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
|
||||
|
||||
dataPathMgr := datapath.NewManager(1)
|
||||
|
||||
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func TestDataDownloadReconcile(t *testing.T) {
|
||||
@@ -441,7 +441,7 @@ func TestDataDownloadReconcile(t *testing.T) {
|
||||
r.restoreExposer = func() exposer.GenericRestoreExposer {
|
||||
ep := exposermockes.NewGenericRestoreExposer(t)
|
||||
if test.isExposeErr {
|
||||
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
|
||||
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
|
||||
} else if test.notNilExpose {
|
||||
hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1.Volume{Name: "test-pvc"}).Result()
|
||||
hostingPod.ObjectMeta.SetUID("test-uid")
|
||||
@@ -959,7 +959,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler,
|
||||
return dt.resumeErr
|
||||
}
|
||||
|
||||
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error {
|
||||
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -73,13 +73,14 @@ type DataUploadReconciler struct {
|
||||
dataPathMgr *datapath.Manager
|
||||
loadAffinity *nodeagent.LoadAffinity
|
||||
backupPVCConfig map[string]nodeagent.BackupPVC
|
||||
podResources corev1.ResourceRequirements
|
||||
preparingTimeout time.Duration
|
||||
metrics *metrics.ServerMetrics
|
||||
}
|
||||
|
||||
func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
|
||||
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, clock clocks.WithTickerAndDelayedExecution,
|
||||
nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
|
||||
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, podResources corev1.ResourceRequirements,
|
||||
clock clocks.WithTickerAndDelayedExecution, nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
|
||||
return &DataUploadReconciler{
|
||||
client: client,
|
||||
mgr: mgr,
|
||||
@@ -92,6 +93,7 @@ func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClie
|
||||
dataPathMgr: dataPathMgr,
|
||||
loadAffinity: loadAffinity,
|
||||
backupPVCConfig: backupPVCConfig,
|
||||
podResources: podResources,
|
||||
preparingTimeout: preparingTimeout,
|
||||
metrics: metrics,
|
||||
}
|
||||
@@ -795,6 +797,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
|
||||
VolumeSize: pvc.Spec.Resources.Requests[corev1.ResourceStorage],
|
||||
Affinity: r.loadAffinity,
|
||||
BackupPVCConfig: r.backupPVCConfig,
|
||||
Resources: r.podResources,
|
||||
}, nil
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
@@ -232,7 +232,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
|
||||
fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet)
|
||||
|
||||
return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{},
|
||||
testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
corev1.ResourceRequirements{}, testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
|
||||
}
|
||||
|
||||
func dataUploadBuilder() *builder.DataUploadBuilder {
|
||||
|
||||
@@ -70,6 +70,9 @@ type CSISnapshotExposeParam struct {
|
||||
|
||||
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
|
||||
BackupPVCConfig map[string]nodeagent.BackupPVC
|
||||
|
||||
// Resources defines the resource requirements of the hosting pod
|
||||
Resources corev1.ResourceRequirements
|
||||
}
|
||||
|
||||
// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
|
||||
@@ -191,7 +194,7 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje
|
||||
}
|
||||
}()
|
||||
|
||||
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity)
|
||||
backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity, csiExposeParam.Resources)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error to create backup pod")
|
||||
}
|
||||
@@ -423,7 +426,7 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co
|
||||
}
|
||||
|
||||
func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, operationTimeout time.Duration,
|
||||
label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) {
|
||||
label map[string]string, affinity *nodeagent.LoadAffinity, resources corev1.ResourceRequirements) (*corev1.Pod, error) {
|
||||
podName := ownerObject.Name
|
||||
|
||||
containerName := string(ownerObject.UID)
|
||||
@@ -513,6 +516,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
|
||||
VolumeMounts: volumeMounts,
|
||||
VolumeDevices: volumeDevices,
|
||||
Env: podInfo.env,
|
||||
Resources: resources,
|
||||
},
|
||||
},
|
||||
ServiceAccountName: podInfo.serviceAccount,
|
||||
|
||||
@@ -37,7 +37,7 @@ import (
|
||||
// GenericRestoreExposer is the interfaces for a generic restore exposer
|
||||
type GenericRestoreExposer interface {
|
||||
// Expose starts the process to a restore expose, the expose process may take long time
|
||||
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, time.Duration) error
|
||||
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error
|
||||
|
||||
// GetExposed polls the status of the expose.
|
||||
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
|
||||
@@ -69,7 +69,7 @@ type genericRestoreExposer struct {
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, timeout time.Duration) error {
|
||||
func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error {
|
||||
curLog := e.log.WithFields(logrus.Fields{
|
||||
"owner": ownerObject.Name,
|
||||
"target PVC": targetPVCName,
|
||||
@@ -87,7 +87,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O
|
||||
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
|
||||
}
|
||||
|
||||
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode)
|
||||
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error to create restore pod")
|
||||
}
|
||||
@@ -297,7 +297,7 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co
|
||||
}
|
||||
|
||||
func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim,
|
||||
operationTimeout time.Duration, label map[string]string, selectedNode string) (*corev1.Pod, error) {
|
||||
operationTimeout time.Duration, label map[string]string, selectedNode string, resources corev1.ResourceRequirements) (*corev1.Pod, error) {
|
||||
restorePodName := ownerObject.Name
|
||||
restorePVCName := ownerObject.Name
|
||||
|
||||
@@ -370,6 +370,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
|
||||
VolumeMounts: volumeMounts,
|
||||
VolumeDevices: volumeDevices,
|
||||
Env: podInfo.env,
|
||||
Resources: resources,
|
||||
},
|
||||
},
|
||||
ServiceAccountName: podInfo.serviceAccount,
|
||||
|
||||
@@ -180,7 +180,7 @@ func TestRestoreExpose(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, time.Millisecond)
|
||||
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond)
|
||||
assert.EqualError(t, err, test.err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -26,17 +26,17 @@ func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectRefer
|
||||
_m.Called(_a0, _a1)
|
||||
}
|
||||
|
||||
// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5
|
||||
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error {
|
||||
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5)
|
||||
// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5, _a6
|
||||
func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 v1.ResourceRequirements, _a6 time.Duration) error {
|
||||
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for Expose")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok {
|
||||
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, v1.ResourceRequirements, time.Duration) error); ok {
|
||||
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
@@ -70,6 +70,13 @@ type BackupPVC struct {
|
||||
ReadOnly bool `json:"readOnly,omitempty"`
|
||||
}
|
||||
|
||||
type PodResources struct {
|
||||
CPURequest string `json:"cpuRequest,omitempty"`
|
||||
MemoryRequest string `json:"memoryRequest,omitempty"`
|
||||
CPULimit string `json:"cpuLimit,omitempty"`
|
||||
MemoryLimit string `json:"memoryLimit,omitempty"`
|
||||
}
|
||||
|
||||
type Configs struct {
|
||||
// LoadConcurrency is the config for data path load concurrency per node.
|
||||
LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"`
|
||||
@@ -79,6 +86,9 @@ type Configs struct {
|
||||
|
||||
// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
|
||||
BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"`
|
||||
|
||||
// PodResources is the resource config for various types of pods launched by node-agent, i.e., data mover pods.
|
||||
PodResources *PodResources `json:"podResources,omitempty"`
|
||||
}
|
||||
|
||||
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found
|
||||
|
||||
Reference in New Issue
Block a user