Files
velero/pkg/exposer/vgdp_counter.go
lyndon-li 06d305ea47
Some checks failed
Run the E2E test on kind / build (push) Failing after 7m38s
Run the E2E test on kind / setup-test-matrix (push) Successful in 4s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / Build (push) Failing after 39s
Close stale issues and PRs / stale (push) Successful in 22s
Trivy Nightly Scan / Trivy nightly scan (velero, main) (push) Failing after 1m32s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-aws, main) (push) Failing after 1m41s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-gcp, main) (push) Failing after 1m30s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-microsoft-azure, main) (push) Failing after 1m18s
Issue 8344: constrain data path expose (#9064)
* issue 8344: constrain data path exposure.

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
2025-07-18 13:32:45 +08:00

224 lines
7.3 KiB
Go

package exposer
import (
"context"
"sync/atomic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"sigs.k8s.io/controller-runtime/pkg/manager"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type dynamicQueueLength struct {
queueLength int
changeID uint64
}
type VgdpCounter struct {
client ctlclient.Client
allowedQueueLength int
duState dynamicQueueLength
ddState dynamicQueueLength
pvbState dynamicQueueLength
pvrState dynamicQueueLength
duCacheState dynamicQueueLength
ddCacheState dynamicQueueLength
pvbCacheState dynamicQueueLength
pvrCacheState dynamicQueueLength
}
func StartVgdpCounter(ctx context.Context, mgr manager.Manager, queueLength int) (*VgdpCounter, error) {
counter := &VgdpCounter{
client: mgr.GetClient(),
allowedQueueLength: queueLength,
}
atomic.StoreUint64(&counter.duState.changeID, 1)
atomic.StoreUint64(&counter.ddState.changeID, 1)
atomic.StoreUint64(&counter.pvbState.changeID, 1)
atomic.StoreUint64(&counter.pvrState.changeID, 1)
if err := counter.initListeners(ctx, mgr); err != nil {
return nil, err
}
return counter, nil
}
func (w *VgdpCounter) initListeners(ctx context.Context, mgr manager.Manager) error {
duInformer, err := mgr.GetCache().GetInformer(ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return errors.Wrap(err, "error getting du informer")
}
if _, err := duInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldDu := oldObj.(*velerov2alpha1api.DataUpload)
newDu := newObj.(*velerov2alpha1api.DataUpload)
if oldDu.Status.Phase == newDu.Status.Phase {
return
}
if newDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
oldDu.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
oldDu.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted && newDu.Status.Phase != velerov2alpha1api.DataUploadPhasePrepared {
atomic.AddUint64(&w.duState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering du handler")
}
ddInformer, err := mgr.GetCache().GetInformer(ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return errors.Wrap(err, "error getting dd informer")
}
if _, err := ddInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldDd := oldObj.(*velerov2alpha1api.DataDownload)
newDd := newObj.(*velerov2alpha1api.DataDownload)
if oldDd.Status.Phase == newDd.Status.Phase {
return
}
if newDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
oldDd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted && newDd.Status.Phase != velerov2alpha1api.DataDownloadPhasePrepared {
atomic.AddUint64(&w.ddState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering dd handler")
}
pvbInformer, err := mgr.GetCache().GetInformer(ctx, &velerov1api.PodVolumeBackup{})
if err != nil {
return errors.Wrap(err, "error getting PVB informer")
}
if _, err := pvbInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldPvb := oldObj.(*velerov1api.PodVolumeBackup)
newPvb := newObj.(*velerov1api.PodVolumeBackup)
if oldPvb.Status.Phase == newPvb.Status.Phase {
return
}
if newPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted ||
oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhasePrepared ||
oldPvb.Status.Phase == velerov1api.PodVolumeBackupPhaseAccepted && newPvb.Status.Phase != velerov1api.PodVolumeBackupPhasePrepared {
atomic.AddUint64(&w.pvbState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering PVB handler")
}
pvrInformer, err := mgr.GetCache().GetInformer(ctx, &velerov1api.PodVolumeRestore{})
if err != nil {
return errors.Wrap(err, "error getting PVR informer")
}
if _, err := pvrInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj any) {
oldPvr := oldObj.(*velerov1api.PodVolumeRestore)
newPvr := newObj.(*velerov1api.PodVolumeRestore)
if oldPvr.Status.Phase == newPvr.Status.Phase {
return
}
if newPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted ||
oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhasePrepared ||
oldPvr.Status.Phase == velerov1api.PodVolumeRestorePhaseAccepted && newPvr.Status.Phase != velerov1api.PodVolumeRestorePhasePrepared {
atomic.AddUint64(&w.pvrState.changeID, 1)
}
},
},
); err != nil {
return errors.Wrap(err, "error registering PVR handler")
}
return nil
}
func (w *VgdpCounter) IsConstrained(ctx context.Context, log logrus.FieldLogger) bool {
id := atomic.LoadUint64(&w.duState.changeID)
if id != w.duCacheState.changeID {
duList := &velerov2alpha1api.DataUploadList{}
if err := w.client.List(ctx, duList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list data uploads, skip counting")
} else {
w.duCacheState.queueLength = len(duList.Items)
w.duCacheState.changeID = id
log.Infof("Query queue length for du %d", w.duCacheState.queueLength)
}
}
id = atomic.LoadUint64(&w.ddState.changeID)
if id != w.ddCacheState.changeID {
ddList := &velerov2alpha1api.DataDownloadList{}
if err := w.client.List(ctx, ddList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list data downloads, skip counting")
} else {
w.ddCacheState.queueLength = len(ddList.Items)
w.ddCacheState.changeID = id
log.Infof("Query queue length for dd %d", w.ddCacheState.queueLength)
}
}
id = atomic.LoadUint64(&w.pvbState.changeID)
if id != w.pvbCacheState.changeID {
pvbList := &velerov1api.PodVolumeBackupList{}
if err := w.client.List(ctx, pvbList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list PVB, skip counting")
} else {
w.pvbCacheState.queueLength = len(pvbList.Items)
w.pvbCacheState.changeID = id
log.Infof("Query queue length for pvb %d", w.pvbCacheState.queueLength)
}
}
id = atomic.LoadUint64(&w.pvrState.changeID)
if id != w.pvrCacheState.changeID {
pvrList := &velerov1api.PodVolumeRestoreList{}
if err := w.client.List(ctx, pvrList, &ctlclient.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(map[string]string{ExposeOnGoingLabel: "true"}))}); err != nil {
log.WithError(err).Warn("Failed to list PVR, skip counting")
} else {
w.pvrCacheState.queueLength = len(pvrList.Items)
w.pvrCacheState.changeID = id
log.Infof("Query queue length for pvr %d", w.pvrCacheState.queueLength)
}
}
existing := w.duCacheState.queueLength + w.ddCacheState.queueLength + w.pvbCacheState.queueLength + w.pvrCacheState.queueLength
constrained := existing >= w.allowedQueueLength
return constrained
}