Issue 8344: constrain data path expose (#9064)
Some checks failed
Run the E2E test on kind / build (push) Failing after 7m38s
Run the E2E test on kind / setup-test-matrix (push) Successful in 4s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / Build (push) Failing after 39s
Close stale issues and PRs / stale (push) Successful in 22s
Trivy Nightly Scan / Trivy nightly scan (velero, main) (push) Failing after 1m32s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-aws, main) (push) Failing after 1m41s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-gcp, main) (push) Failing after 1m30s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-microsoft-azure, main) (push) Failing after 1m18s

* issue 8344: constrain data path exposure.

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
lyndon-li
2025-07-18 13:32:45 +08:00
committed by GitHub
parent 29a8bc4492
commit 06d305ea47
16 changed files with 671 additions and 57 deletions

View File

@@ -149,3 +149,9 @@ func (b *PodVolumeBackupBuilder) OwnerReference(ref metav1.OwnerReference) *PodV
b.object.OwnerReferences = append(b.object.OwnerReferences, ref)
return b
}
// Labels sets the PodVolumeBackup's Labels.
func (b *PodVolumeBackupBuilder) Labels(label map[string]string) *PodVolumeBackupBuilder {
b.object.Labels = label
return b
}

View File

@@ -133,3 +133,9 @@ func (b *PodVolumeRestoreBuilder) Node(node string) *PodVolumeRestoreBuilder {
b.object.Status.Node = node
return b
}
// Labels sets the PodVolumeRestoreBuilder's Labels.
func (b *PodVolumeRestoreBuilder) Labels(label map[string]string) *PodVolumeRestoreBuilder {
b.object.Labels = label
return b
}

View File

@@ -57,6 +57,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
@@ -140,6 +141,7 @@ type nodeAgentServer struct {
csiSnapshotClient *snapshotv1client.Clientset
dataPathMgr *datapath.Manager
dataPathConfigs *nodeagent.Configs
vgdpCounter *exposer.VgdpCounter
}
func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
@@ -300,12 +302,21 @@ func (s *nodeAgentServer) run() {
}
}
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger)
if s.dataPathConfigs != nil && s.dataPathConfigs.LoadConcurrency != nil && s.dataPathConfigs.LoadConcurrency.PrepareQueueLength > 0 {
if counter, err := exposer.StartVgdpCounter(s.ctx, s.mgr, s.dataPathConfigs.LoadConcurrency.PrepareQueueLength); err != nil {
s.logger.WithError(err).Warnf("Failed to start VGDP counter, VDGP loads are not constrained")
} else {
s.vgdpCounter = counter
s.logger.Infof("VGDP loads are constrained with %d", s.dataPathConfigs.LoadConcurrency.PrepareQueueLength)
}
}
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.vgdpCounter, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.metrics, s.logger)
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup)
}
pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger)
pvrReconciler := controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.vgdpCounter, s.nodeName, s.config.dataMoverPrepareTimeout, s.config.resourceTimeout, podResources, s.logger)
if err := pvrReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
@@ -320,6 +331,7 @@ func (s *nodeAgentServer) run() {
s.kubeClient,
s.csiSnapshotClient.SnapshotV1(),
s.dataPathMgr,
s.vgdpCounter,
loadAffinity,
backupPVCConfig,
podResources,
@@ -344,6 +356,7 @@ func (s *nodeAgentServer) run() {
s.mgr,
s.kubeClient,
s.dataPathMgr,
s.vgdpCounter,
loadAffinity,
restorePVCConfig,
podResources,

View File

@@ -64,6 +64,7 @@ type DataDownloadReconciler struct {
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
vgdpCounter *exposer.VgdpCounter
loadAffinity []*kube.LoadAffinity
restorePVCConfig nodeagent.RestorePVC
podResources corev1api.ResourceRequirements
@@ -77,6 +78,7 @@ func NewDataDownloadReconciler(
mgr manager.Manager,
kubeClient kubernetes.Interface,
dataPathMgr *datapath.Manager,
counter *exposer.VgdpCounter,
loadAffinity []*kube.LoadAffinity,
restorePVCConfig nodeagent.RestorePVC,
podResources corev1api.ResourceRequirements,
@@ -95,6 +97,7 @@ func NewDataDownloadReconciler(
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
restorePVCConfig: restorePVCConfig,
dataPathMgr: dataPathMgr,
vgdpCounter: counter,
loadAffinity: loadAffinity,
podResources: podResources,
preparingTimeout: preparingTimeout,
@@ -220,13 +223,26 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew {
log.Info("Data download starting")
if dd.Spec.Cancel {
log.Debugf("Data download is canceled in Phase %s", dd.Status.Phase)
r.tryCancelDataDownload(ctx, dd, "")
return ctrl.Result{}, nil
}
if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) {
log.Debug("Data path initiation is constrained, requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
if _, err := r.getTargetPVC(ctx, dd); err != nil {
log.WithField("error", err).Debugf("Cannot find target PVC for DataDownload yet. Retry later.")
return ctrl.Result{Requeue: true}, nil
}
log.Info("Data download starting")
accepted, err := r.acceptDataDownload(ctx, dd)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error accepting the data download %s", dd.Name)
@@ -239,12 +255,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
log.Info("Data download is accepted")
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
return ctrl.Result{}, nil
}
exposeParam, err := r.setupExposeParam(dd)
if err != nil {
return r.errorOut(ctx, dd, err, "failed to set exposer parameters", log)
@@ -312,7 +322,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
dd.Name, dd.Namespace, result.ByPod.HostingPod.Name, result.ByPod.HostingContainer, dd.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
log.Debug("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else {
return r.errorOut(ctx, dd, err, "error to create data path", log)
@@ -337,6 +347,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
delete(dd.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will data path close and retry", dd.Name)
@@ -454,6 +466,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
delete(dd.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating data download status")
@@ -504,6 +518,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
delete(dd.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating data download status")
@@ -525,6 +541,8 @@ func (r *DataDownloadReconciler) tryCancelDataDownload(ctx context.Context, dd *
if message != "" {
dataDownload.Status.Message = message
}
delete(dataDownload.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {
@@ -702,6 +720,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
dd.Status.Message = errors.WithMessage(err, msg).Error()
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
delete(dd.Labels, exposer.ExposeOnGoingLabel)
return true
}); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
@@ -724,6 +744,11 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
datadownload.Status.AcceptedByNode = r.nodeName
datadownload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}
if datadownload.Labels == nil {
datadownload.Labels = make(map[string]string)
}
datadownload.Labels[exposer.ExposeOnGoingLabel] = "true"
}
succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, updated, updateFunc)
@@ -749,6 +774,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
succeeded, err := funcExclusiveUpdateDataDownload(ctx, r.client, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
delete(dd.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {

View File

@@ -136,6 +136,7 @@ func initDataDownloadReconcilerWithError(t *testing.T, objects []any, needError
fakeKubeClient,
dataPathMgr,
nil,
nil,
nodeagent.RestorePVC{},
corev1api.ResourceRequirements{},
"test-node",
@@ -195,6 +196,7 @@ func TestDataDownloadReconcile(t *testing.T) {
mockCancel bool
mockClose bool
needExclusiveUpdateError error
constrained bool
expected *velerov2alpha1api.DataDownload
expectDeleted bool
expectCancelRecord bool
@@ -295,6 +297,20 @@ func TestDataDownloadReconcile(t *testing.T) {
name: "Unknown data download status",
dd: dataDownloadBuilder().Phase("Unknown").Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
},
{
name: "dd is cancel on new",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expectCancelRecord: true,
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "new dd but constrained",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
constrained: true,
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "new dd but no target PVC",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
@@ -308,12 +324,6 @@ func TestDataDownloadReconcile(t *testing.T) {
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "error accepting the data download datadownload-1: exclusive-update-error",
},
{
name: "dd is cancel on accepted",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
expected: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "dd is accepted but setup expose param failed",
dd: dataDownloadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).NodeOS("xxx").Result(),
@@ -488,6 +498,10 @@ func TestDataDownloadReconcile(t *testing.T) {
r.cancelledDataDownload[test.dd.Name] = test.sportTime.Time
}
if test.constrained {
r.vgdpCounter = &exposer.VgdpCounter{}
}
funcExclusiveUpdateDataDownload = exclusiveUpdateDataDownload
if test.needExclusiveUpdateError != nil {
funcExclusiveUpdateDataDownload = func(context.Context, kbclient.Client, *velerov2alpha1api.DataDownload, func(*velerov2alpha1api.DataDownload)) (bool, error) {
@@ -571,13 +585,13 @@ func TestDataDownloadReconcile(t *testing.T) {
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}
if test.expected != nil || test.expectDeleted {
dd := velerov2alpha1api.DataDownload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.dd.Name,
Namespace: test.dd.Namespace,
}, &dd)
dd := velerov2alpha1api.DataDownload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.dd.Name,
Namespace: test.dd.Namespace,
}, &dd)
if test.expected != nil || test.expectDeleted {
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
@@ -601,6 +615,12 @@ func TestDataDownloadReconcile(t *testing.T) {
} else {
assert.Empty(t, r.cancelledDataDownload)
}
if isDataDownloadInFinalState(&dd) || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
assert.NotContains(t, dd.Labels, exposer.ExposeOnGoingLabel)
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
assert.Contains(t, dd.Labels, exposer.ExposeOnGoingLabel)
}
})
}
}

View File

@@ -74,6 +74,7 @@ type DataUploadReconciler struct {
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
vgdpCounter *exposer.VgdpCounter
loadAffinity []*kube.LoadAffinity
backupPVCConfig map[string]nodeagent.BackupPVC
podResources corev1api.ResourceRequirements
@@ -88,6 +89,7 @@ func NewDataUploadReconciler(
kubeClient kubernetes.Interface,
csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager,
counter *exposer.VgdpCounter,
loadAffinity []*kube.LoadAffinity,
backupPVCConfig map[string]nodeagent.BackupPVC,
podResources corev1api.ResourceRequirements,
@@ -113,6 +115,7 @@ func NewDataUploadReconciler(
),
},
dataPathMgr: dataPathMgr,
vgdpCounter: counter,
loadAffinity: loadAffinity,
backupPVCConfig: backupPVCConfig,
podResources: podResources,
@@ -241,6 +244,19 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew {
if du.Spec.Cancel {
log.Debugf("Data upload is canceled in Phase %s", du.Status.Phase)
r.tryCancelDataUpload(ctx, du, "")
return ctrl.Result{}, nil
}
if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) {
log.Debug("Data path initiation is constrained, requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
log.Info("Data upload starting")
accepted, err := r.acceptDataUpload(ctx, du)
@@ -255,11 +271,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log.Info("Data upload is accepted")
if du.Spec.Cancel {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
return ctrl.Result{}, nil
}
exposeParam, err := r.setupExposeParam(du)
if err != nil {
return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
@@ -330,7 +341,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
log.Debug("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else {
return r.errorOut(ctx, du, err, "error to create data path", log)
@@ -356,6 +367,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
du.Status.NodeOS = velerov2alpha1api.NodeOS(*res.ByPod.NodeOS)
delete(du.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name)
@@ -481,6 +494,8 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
du.Status.Message = "volume was empty so no data was upload"
}
delete(du.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating DataUpload status")
@@ -531,6 +546,8 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
delete(du.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating DataUpload status")
@@ -552,6 +569,8 @@ func (r *DataUploadReconciler) tryCancelDataUpload(ctx context.Context, du *vele
if message != "" {
dataUpload.Status.Message = message
}
delete(dataUpload.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {
@@ -760,6 +779,8 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
}
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
delete(du.Labels, exposer.ExposeOnGoingLabel)
return true
}); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
@@ -781,6 +802,11 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov
dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
dataUpload.Status.AcceptedByNode = r.nodeName
dataUpload.Status.AcceptedTimestamp = &metav1.Time{Time: r.Clock.Now()}
if dataUpload.Labels == nil {
dataUpload.Labels = make(map[string]string)
}
dataUpload.Labels[exposer.ExposeOnGoingLabel] = "true"
}
succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, updated, updateFunc)
@@ -807,6 +833,8 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov
succeeded, err := funcExclusiveUpdateDataUpload(ctx, r.client, du, func(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = "timeout on preparing data upload"
delete(du.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {

View File

@@ -241,6 +241,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
fakeSnapshotClient.SnapshotV1(),
dataPathMgr,
nil,
nil,
map[string]nodeagent.BackupPVC{},
corev1api.ResourceRequirements{},
testclocks.NewFakeClock(now),
@@ -258,7 +259,6 @@ func dataUploadBuilder() *builder.DataUploadBuilder {
VolumeSnapshot: "fake-volume-snapshot",
}
return builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).
Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).
BackupStorageLocation("bsl-loc").
DataMover("velero").
SnapshotType("CSI").SourceNamespace("fake-ns").SourcePVC("test-pvc").CSISnapshot(csi)
@@ -361,6 +361,7 @@ func TestReconcile(t *testing.T) {
getExposeNil bool
fsBRInitErr error
fsBRStartErr error
constrained bool
expectedErr string
expectedResult *ctrl.Result
expectDataPath bool
@@ -464,6 +465,19 @@ func TestReconcile(t *testing.T) {
expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedErr: "unknown type type of snapshot exposer is not exist",
},
{
name: "du is cancel on new",
du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
expectCancelRecord: true,
expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
},
{
name: "new du but constrained",
du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
constrained: true,
expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "new du but accept failed",
du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
@@ -471,11 +485,6 @@ func TestReconcile(t *testing.T) {
expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
expectedErr: "error accepting the data upload dataupload-1: exclusive-update-error",
},
{
name: "du is cancel on accepted",
du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Result(),
expected: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Cancel(true).Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
},
{
name: "du is accepted but setup expose param failed on getting PVC",
du: dataUploadBuilder().Finalizers([]string{DataUploadDownloadFinalizer}).Result(),
@@ -636,6 +645,10 @@ func TestReconcile(t *testing.T) {
r.cancelledDataUpload[test.du.Name] = test.sportTime.Time
}
if test.constrained {
r.vgdpCounter = &exposer.VgdpCounter{}
}
if test.du.Spec.SnapshotType == fakeSnapshotType {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.ambiguousNodeOS, test.peekErr, test.exposeErr, test.getExposeErr, test.getExposeNil}}
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
@@ -683,13 +696,13 @@ func TestReconcile(t *testing.T) {
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}
if test.expected != nil || test.expectDeleted {
du := velerov2alpha1api.DataUpload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.du.Name,
Namespace: test.du.Namespace,
}, &du)
du := velerov2alpha1api.DataUpload{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.du.Name,
Namespace: test.du.Namespace,
}, &du)
if test.expected != nil || test.expectDeleted {
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
@@ -713,6 +726,12 @@ func TestReconcile(t *testing.T) {
} else {
assert.Empty(t, r.cancelledDataUpload)
}
if isDataUploadInFinalState(&du) || du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
assert.NotContains(t, du.Labels, exposer.ExposeOnGoingLabel)
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
assert.Contains(t, du.Labels, exposer.ExposeOnGoingLabel)
}
})
}
}

View File

@@ -59,7 +59,7 @@ const (
// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance
func NewPodVolumeBackupReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements,
counter *exposer.VgdpCounter, nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements,
metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler {
return &PodVolumeBackupReconciler{
client: client,
@@ -71,6 +71,7 @@ func NewPodVolumeBackupReconciler(client client.Client, mgr manager.Manager, kub
metrics: metrics,
podResources: podResources,
dataPathMgr: dataPathMgr,
vgdpCounter: counter,
preparingTimeout: preparingTimeout,
resourceTimeout: resourceTimeout,
exposer: exposer.NewPodVolumeExposer(kubeClient, logger),
@@ -90,6 +91,7 @@ type PodVolumeBackupReconciler struct {
logger logrus.FieldLogger
podResources corev1api.ResourceRequirements
dataPathMgr *datapath.Manager
vgdpCounter *exposer.VgdpCounter
preparingTimeout time.Duration
resourceTimeout time.Duration
cancelledPVB map[string]time.Time
@@ -212,6 +214,11 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}
if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) {
log.Debug("Data path initiation is constrained, requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
log.Info("Accepting PVB")
if err := r.acceptPodVolumeBackup(ctx, pvb); err != nil {
@@ -278,7 +285,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
pvb.Name, pvb.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvb.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
log.Debug("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else {
return r.errorOut(ctx, pvb, err, "error to create data path", log)
@@ -304,6 +311,8 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress
pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
delete(pvb.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update PVB %s to InProgress, will data path close and retry", pvb.Name)
@@ -370,6 +379,11 @@ func (r *PodVolumeBackupReconciler) acceptPodVolumeBackup(ctx context.Context, p
pvb.Status.AcceptedTimestamp = &metav1.Time{Time: r.clock.Now()}
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseAccepted
if pvb.Labels == nil {
pvb.Labels = make(map[string]string)
}
pvb.Labels[exposer.ExposeOnGoingLabel] = "true"
return true
})
}
@@ -386,6 +400,8 @@ func (r *PodVolumeBackupReconciler) tryCancelPodVolumeBackup(ctx context.Context
if message != "" {
pvb.Status.Message = message
}
delete(pvb.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {
@@ -428,6 +444,8 @@ func (r *PodVolumeBackupReconciler) onPrepareTimeout(ctx context.Context, pvb *v
succeeded, err := funcExclusiveUpdatePodVolumeBackup(ctx, r.client, pvb, func(pvb *velerov1api.PodVolumeBackup) {
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = "timeout on preparing PVB"
delete(pvb.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {
@@ -508,6 +526,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam
pvb.Status.Message = "volume was empty so no snapshot was taken"
}
delete(pvb.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating PVB status")
@@ -565,6 +585,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, nam
}
pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
delete(pvb.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating PVB status on cancel")
@@ -741,6 +763,8 @@ func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1
pvb.Status.StartTimestamp = &metav1.Time{Time: time}
}
delete(pvb.Labels, exposer.ExposeOnGoingLabel)
return true
}); patchErr != nil {
log.WithError(patchErr).Warn("error updating PVB status")

View File

@@ -144,6 +144,7 @@ func initPVBReconcilerWithError(needError ...error) (*PodVolumeBackupReconciler,
nil,
fakeKubeClient,
dataPathMgr,
nil,
"test-node",
time.Minute*5,
time.Minute,
@@ -224,6 +225,7 @@ func TestPVBReconcile(t *testing.T) {
getExposeNil bool
fsBRInitErr error
fsBRStartErr error
constrained bool
expectedErr string
expectedResult *ctrl.Result
expectDataPath bool
@@ -317,6 +319,13 @@ func TestPVBReconcile(t *testing.T) {
name: "Unknown pvb status",
pvb: pvbBuilder().Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "new pvb but constrained",
pvb: pvbBuilder().Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
constrained: true,
expected: pvbBuilder().Finalizers([]string{PodVolumeFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "new pvb but accept failed",
pvb: pvbBuilder().Finalizers([]string{PodVolumeFinalizer}).Node("test-node").Result(),
@@ -480,6 +489,10 @@ func TestPVBReconcile(t *testing.T) {
r.cancelledPVB[test.pvb.Name] = test.sportTime.Time
}
if test.constrained {
r.vgdpCounter = &exposer.VgdpCounter{}
}
if test.needMockExposer {
r.exposer = &fakePvbExposer{r.client, r.clock, test.peekErr, test.exposeErr, test.getExposeErr, test.getExposeNil}
}
@@ -525,13 +538,13 @@ func TestPVBReconcile(t *testing.T) {
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}
if test.expected != nil || test.expectDeleted {
pvb := velerov1api.PodVolumeBackup{}
err = r.client.Get(ctx, client.ObjectKey{
Name: test.pvb.Name,
Namespace: test.pvb.Namespace,
}, &pvb)
pvb := velerov1api.PodVolumeBackup{}
err = r.client.Get(ctx, client.ObjectKey{
Name: test.pvb.Name,
Namespace: test.pvb.Namespace,
}, &pvb)
if test.expected != nil || test.expectDeleted {
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
@@ -555,6 +568,12 @@ func TestPVBReconcile(t *testing.T) {
} else {
assert.Empty(t, r.cancelledPVB)
}
if isPVBInFinalState(&pvb) || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseInProgress {
assert.NotContains(t, pvb.Labels, exposer.ExposeOnGoingLabel)
} else if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted {
assert.Contains(t, pvb.Labels, exposer.ExposeOnGoingLabel)
}
})
}
}

View File

@@ -55,7 +55,7 @@ import (
)
func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements,
counter *exposer.VgdpCounter, nodeName string, preparingTimeout time.Duration, resourceTimeout time.Duration, podResources corev1api.ResourceRequirements,
logger logrus.FieldLogger) *PodVolumeRestoreReconciler {
return &PodVolumeRestoreReconciler{
client: client,
@@ -66,6 +66,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, mgr manager.Manager, ku
clock: &clocks.RealClock{},
podResources: podResources,
dataPathMgr: dataPathMgr,
vgdpCounter: counter,
preparingTimeout: preparingTimeout,
resourceTimeout: resourceTimeout,
exposer: exposer.NewPodVolumeExposer(kubeClient, logger),
@@ -83,6 +84,7 @@ type PodVolumeRestoreReconciler struct {
podResources corev1api.ResourceRequirements
exposer exposer.PodVolumeExposer
dataPathMgr *datapath.Manager
vgdpCounter *exposer.VgdpCounter
preparingTimeout time.Duration
resourceTimeout time.Duration
cancelledPVR map[string]time.Time
@@ -210,6 +212,11 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, nil
}
if r.vgdpCounter != nil && r.vgdpCounter.IsConstrained(ctx, r.logger) {
log.Debug("Data path initiation is constrained, requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
log.Info("Accepting PVR")
if err := r.acceptPodVolumeRestore(ctx, pvr); err != nil {
@@ -282,7 +289,7 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
pvr.Name, pvr.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, pvr.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
log.Debug("Data path instance is concurrent limited requeue later")
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
} else {
return r.errorOut(ctx, pvr, err, "error to create data path", log)
@@ -306,6 +313,8 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress
pvr.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
delete(pvr.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Warnf("Failed to update PVR %s to InProgress, will data path close and retry", pvr.Name)
@@ -373,6 +382,11 @@ func (r *PodVolumeRestoreReconciler) acceptPodVolumeRestore(ctx context.Context,
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseAccepted
pvr.Status.Node = r.nodeName
if pvr.Labels == nil {
pvr.Labels = make(map[string]string)
}
pvr.Labels[exposer.ExposeOnGoingLabel] = "true"
return true
})
}
@@ -389,6 +403,8 @@ func (r *PodVolumeRestoreReconciler) tryCancelPodVolumeRestore(ctx context.Conte
if message != "" {
pvr.Status.Message = message
}
delete(pvr.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {
@@ -433,6 +449,8 @@ func (r *PodVolumeRestoreReconciler) onPrepareTimeout(ctx context.Context, pvr *
succeeded, err := funcExclusiveUpdatePodVolumeRestore(ctx, r.client, pvr, func(pvr *velerov1api.PodVolumeRestore) {
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
pvr.Status.Message = "timeout on preparing PVR"
delete(pvr.Labels, exposer.ExposeOnGoingLabel)
})
if err != nil {
@@ -499,6 +517,8 @@ func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1
pvr.Status.Message = errors.WithMessage(err, msg).Error()
pvr.Status.CompletionTimestamp = &metav1.Time{Time: time}
delete(pvr.Labels, exposer.ExposeOnGoingLabel)
return true
}); patchErr != nil {
log.WithError(patchErr).Warn("error updating PVR status")
@@ -749,6 +769,8 @@ func (r *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, na
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted
pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
delete(pvr.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating PVR status")
@@ -798,6 +820,8 @@ func (r *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, na
}
pvr.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
delete(pvr.Labels, exposer.ExposeOnGoingLabel)
return true
}); err != nil {
log.WithError(err).Error("error updating PVR status on cancel")

View File

@@ -617,7 +617,7 @@ func initPodVolumeRestoreReconcilerWithError(objects []runtime.Object, cliObj []
dataPathMgr := datapath.NewManager(1)
return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil
return NewPodVolumeRestoreReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, "test-node", time.Minute*5, time.Minute, corev1api.ResourceRequirements{}, velerotest.NewLogger()), nil
}
func TestPodVolumeRestoreReconcile(t *testing.T) {
@@ -669,6 +669,7 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
mockCancel bool
mockClose bool
needExclusiveUpdateError error
constrained bool
expected *velerov1api.PodVolumeRestore
expectDeleted bool
expectCancelRecord bool
@@ -765,6 +766,14 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
name: "Unknown pvr status",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Phase("Unknown").Finalizers([]string{PodVolumeFinalizer}).Result(),
},
{
name: "new pvb but constrained",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(),
targetPod: builder.ForPod("test-ns", "test-pod").InitContainers(&corev1api.Container{Name: restorehelper.WaitInitContainer}).InitContainerState(corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}).Result(),
constrained: true,
expected: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).Result(),
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "new pvr but accept failed",
pvr: builder.ForPodVolumeRestore(velerov1api.DefaultNamespace, pvrName).Finalizers([]string{PodVolumeFinalizer}).PodNamespace("test-ns").PodName("test-pod").Result(),
@@ -942,6 +951,10 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
r.cancelledPVR[test.pvr.Name] = test.sportTime.Time
}
if test.constrained {
r.vgdpCounter = &exposer.VgdpCounter{}
}
funcExclusiveUpdatePodVolumeRestore = exclusiveUpdatePodVolumeRestore
if test.needExclusiveUpdateError != nil {
funcExclusiveUpdatePodVolumeRestore = func(context.Context, kbclient.Client, *velerov1api.PodVolumeRestore, func(*velerov1api.PodVolumeRestore)) (bool, error) {
@@ -1029,13 +1042,13 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
assert.Equal(t, test.expectedResult.RequeueAfter, actualResult.RequeueAfter)
}
if test.expected != nil || test.expectDeleted {
pvr := velerov1api.PodVolumeRestore{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.pvr.Name,
Namespace: test.pvr.Namespace,
}, &pvr)
pvr := velerov1api.PodVolumeRestore{}
err = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.pvr.Name,
Namespace: test.pvr.Namespace,
}, &pvr)
if test.expected != nil || test.expectDeleted {
if test.expectDeleted {
assert.True(t, apierrors.IsNotFound(err))
} else {
@@ -1059,6 +1072,12 @@ func TestPodVolumeRestoreReconcile(t *testing.T) {
} else {
assert.Empty(t, r.cancelledPVR)
}
if isPVRInFinalState(&pvr) || pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseInProgress {
assert.NotContains(t, pvr.Labels, exposer.ExposeOnGoingLabel)
} else if pvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted {
assert.Contains(t, pvr.Labels, exposer.ExposeOnGoingLabel)
}
})
}
}

View File

@@ -26,6 +26,7 @@ const (
podGroupLabel = "velero.io/exposer-pod-group"
podGroupSnapshot = "snapshot-exposer"
podGroupGenericRestore = "generic-restore-exposer"
ExposeOnGoingLabel = "velero.io/expose-on-going"
)
// ExposeResult defines the result of expose.

223
pkg/exposer/vgdp_counter.go Normal file
View File

@@ -0,0 +1,223 @@
package exposer
import (
"context"
"sync/atomic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type dynamicQueueLength struct {
queueLength int
changeID uint64
}
type VgdpCounter struct {
client ctlclient.Client
allowedQueueLength int
duState dynamicQueueLength
ddState dynamicQueueLength
pvbState dynamicQueueLength
pvrState dynamicQueueLength
duCacheState dynamicQueueLength
ddCacheState dynamicQueueLength
pvbCacheState dynamicQueueLength
pvrCacheState dynamicQueueLength
}
func StartVgdpCounter(ctx context.Context, mgr manager.Manager, queueLength int) (*VgdpCounter, error) {
counter := &VgdpCounter{
client: mgr.GetClient(),
allowedQueueLength: queueLength,
}
atomic.StoreUint64(&counter.duState.changeID, 1)
atomic.StoreUint64(&counter.ddState.changeID, 1)
atomic.StoreUint64(&counter.pvbState.changeID, 1)
atomic.StoreUint64(&counter.pvrState.changeID, 1)
if err := counter.initListeners(ctx, mgr); err != nil {
return nil, err
}
return counter, nil
}
func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) error {
duInformer, err := mgr.GetCache().GetInformer(ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return errors.Wrap(err, "error getting du informer")
}
if _, err := duInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldDu := oldObj.(*velerov2alpha1api.DataUpload)
newDu := newObj.(*velerov2alpha1api.DataUpload)
if oldDu.Status.Phase == newDu.Status.Phase {
return
}
if newDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
oldDu.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
oldDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted && newDu.Status.Phase != velerov2alpha1api.DataUploadPhasePrepared {
atomic.AddUint64(&w.duState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering du handler")
}
ddInformer, err := mgr.GetCache().GetInformer(ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return errors.Wrap(err, "error getting dd informer")
}
if _, err := ddInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldDd := oldObj.(*velerov2alpha1api.DataDownload)
newDd := newObj.(*velerov2alpha1api.DataDownload)
if oldDd.Status.Phase == newDd.Status.Phase {
return
}
if newDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted && newDd.Status.Phase != velerov2alpha1api.DataDownloadPhasePrepared {
atomic.AddUint64(&w.ddState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering dd handler")
}
pvbInformer, err := mgr.GetCache().GetInformer(ctx, &velerov1api.PodVolumeBackup{})
if err != nil {
return errors.Wrap(err, "error getting PVB informer")
}
if _, err := pvbInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldPvb := oldObj.(*velerov1api.PodVolumeBackup)
newPvb := newObj.(*velerov1api.PodVolumeBackup)
if oldPvb.Status.Phase == newPvb.Status.Phase {
return
}
if newPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted ||
oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared ||
oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted && newPvb.Status.Phase != velerov1api.PodVolumeBackupPhasePrepared {
atomic.AddUint64(&w.pvbState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering PVB handler")
}
pvrInformer, err := mgr.GetCache().GetInformer(ctx, &velerov1api.PodVolumeRestore{})
if err != nil {
return errors.Wrap(err, "error getting PVR informer")
}
if _, err := pvrInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
newPvr := newObj.(*velerov1api.PodVolumeRestore)
if oldPvr.Status.Phase == newPvr.Status.Phase {
return
}
if newPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted ||
oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared ||
oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted && newPvr.Status.Phase != velerov1api.PodVolumeRestorePhasePrepared {
atomic.AddUint64(&w.pvrState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering PVR handler")
}
return nil
}
func (w *VgdpCounter) IsConstrained(ctx context.Context, log logrus.FieldLogger) bool {
id := atomic.LoadUint64(&w.duState.changeID)
if id != w.duCacheState.changeID {
duList := &velerov2alpha1api.DataUploadList{}
if err := w.client.List(ctx, duList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list data uploads, skip counting")
} else {
w.duCacheState.queueLength = len(duList.Items)
w.duCacheState.changeID = id
log.Infof("Query queue length for du %d", w.duCacheState.queueLength)
}
}
id = atomic.LoadUint64(&w.ddState.changeID)
if id != w.ddCacheState.changeID {
ddList := &velerov2alpha1api.DataDownloadList{}
if err := w.client.List(ctx, ddList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list data downloads, skip counting")
} else {
w.ddCacheState.queueLength = len(ddList.Items)
w.ddCacheState.changeID = id
log.Infof("Query queue length for dd %d", w.ddCacheState.queueLength)
}
}
id = atomic.LoadUint64(&w.pvbState.changeID)
if id != w.pvbCacheState.changeID {
pvbList := &velerov1api.PodVolumeBackupList{}
if err := w.client.List(ctx, pvbList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list PVB, skip counting")
} else {
w.pvbCacheState.queueLength = len(pvbList.Items)
w.pvbCacheState.changeID = id
log.Infof("Query queue length for pvb %d", w.pvbCacheState.queueLength)
}
}
id = atomic.LoadUint64(&w.pvrState.changeID)
if id != w.pvrCacheState.changeID {
pvrList := &velerov1api.PodVolumeRestoreList{}
if err := w.client.List(ctx, pvrList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list PVR, skip counting")
} else {
w.pvrCacheState.queueLength = len(pvrList.Items)
w.pvrCacheState.changeID = id
log.Infof("Query queue length for pvr %d", w.pvrCacheState.queueLength)
}
}
existing := w.duCacheState.queueLength + w.ddCacheState.queueLength + w.pvbCacheState.queueLength + w.pvrCacheState.queueLength
constrained := existing >= w.allowedQueueLength
return constrained
}

View File

@@ -0,0 +1,181 @@
package exposer
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
)
func TestIsConstrained(t *testing.T) {
tests := []struct {
name string
counter VgdpCounter
kubeClientObj []client.Object
getErr bool
expected bool
}{
{
name: "no change, constrained",
counter: VgdpCounter{},
expected: true,
},
{
name: "no change, not constrained",
counter: VgdpCounter{allowedQueueLength: 1},
},
{
name: "change in du, get failed",
counter: VgdpCounter{
allowedQueueLength: 1,
duState: dynamicQueueLength{0, 1},
},
getErr: true,
},
{
name: "change in du, constrained",
counter: VgdpCounter{
allowedQueueLength: 1,
duState: dynamicQueueLength{0, 1},
},
kubeClientObj: []client.Object{
builder.ForDataUpload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
},
expected: true,
},
{
name: "change in dd, get failed",
counter: VgdpCounter{
allowedQueueLength: 1,
ddState: dynamicQueueLength{0, 1},
},
getErr: true,
},
{
name: "change in dd, constrained",
counter: VgdpCounter{
allowedQueueLength: 1,
ddState: dynamicQueueLength{0, 1},
},
kubeClientObj: []client.Object{
builder.ForDataDownload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
},
expected: true,
},
{
name: "change in pvb, get failed",
counter: VgdpCounter{
allowedQueueLength: 1,
pvbState: dynamicQueueLength{0, 1},
},
getErr: true,
},
{
name: "change in pvb, constrained",
counter: VgdpCounter{
allowedQueueLength: 1,
pvbState: dynamicQueueLength{0, 1},
},
kubeClientObj: []client.Object{
builder.ForPodVolumeBackup("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
},
expected: true,
},
{
name: "change in pvr, get failed",
counter: VgdpCounter{
allowedQueueLength: 1,
pvrState: dynamicQueueLength{0, 1},
},
getErr: true,
},
{
name: "change in pvr, constrained",
counter: VgdpCounter{
allowedQueueLength: 1,
pvrState: dynamicQueueLength{0, 1},
},
kubeClientObj: []client.Object{
builder.ForPodVolumeRestore("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
},
expected: true,
},
{
name: "change in du, pvb, not constrained",
counter: VgdpCounter{
allowedQueueLength: 3,
duState: dynamicQueueLength{0, 1},
pvbState: dynamicQueueLength{0, 1},
},
kubeClientObj: []client.Object{
builder.ForDataUpload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
builder.ForPodVolumeBackup("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
},
},
{
name: "change in dd, pvr, constrained",
counter: VgdpCounter{
allowedQueueLength: 1,
ddState: dynamicQueueLength{0, 1},
pvrState: dynamicQueueLength{0, 1},
},
kubeClientObj: []client.Object{
builder.ForDataDownload("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
builder.ForPodVolumeRestore("velero", "test-1").Labels(map[string]string{ExposeOnGoingLabel: "true"}).Result(),
},
expected: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheme := runtime.NewScheme()
if !test.getErr {
err := velerov1api.AddToScheme(scheme)
require.NoError(t, err)
err = velerov2alpha1api.AddToScheme(scheme)
require.NoError(t, err)
}
test.counter.client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(test.kubeClientObj...).Build()
result := test.counter.IsConstrained(context.TODO(), velerotest.NewLogger())
assert.Equal(t, test.expected, result)
if !test.getErr {
assert.Equal(t, test.counter.duState.changeID, test.counter.duCacheState.changeID)
assert.Equal(t, test.counter.ddState.changeID, test.counter.ddCacheState.changeID)
assert.Equal(t, test.counter.pvbState.changeID, test.counter.pvbCacheState.changeID)
assert.Equal(t, test.counter.pvrState.changeID, test.counter.pvrCacheState.changeID)
} else {
or := test.counter.duState.changeID != test.counter.duCacheState.changeID
if !or {
or = test.counter.ddState.changeID != test.counter.ddCacheState.changeID
}
if !or {
or = test.counter.pvbState.changeID != test.counter.pvbCacheState.changeID
}
if !or {
or = test.counter.pvrState.changeID != test.counter.pvrCacheState.changeID
}
assert.True(t, or)
}
})
}
}

View File

@@ -62,6 +62,9 @@ type LoadConcurrency struct {
// PerNodeConfig specifies the concurrency number to nodes matched by rules
PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"`
// PrepareQueueLength specifies the max number of loads that are under expose
PrepareQueueLength int `json:"prepareQueueLength,omitempty"`
}
type LoadAffinity struct {