Merge pull request #5279 from ywk253100/220829_pause

Support pause/unpause schedules
This commit is contained in:
qiuming
2022-09-20 16:06:17 +08:00
committed by GitHub
21 changed files with 440 additions and 89 deletions

View File

@@ -207,18 +207,15 @@ func (r *backupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) err
mgr.GetClient(),
&velerov1api.BackupStorageLocationList{},
bslValidationEnqueuePeriod,
kube.PeriodicalEnqueueSourceOption{
FilterFuncs: []func(object client.Object) bool{
func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation))
},
},
},
kube.PeriodicalEnqueueSourceOption{},
)
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation))
})
return ctrl.NewControllerManagedBy(mgr).
// As the "status.LastValidationTime" field is always updated, this triggers new reconciling process, skip the update event that include no spec change to avoid the reconcile loop
For(&velerov1api.BackupStorageLocation{}, builder.WithPredicates(kube.SpecChangePredicate{})).
Watches(g, nil).
Watches(g, nil, builder.WithPredicates(gp)).
Complete(r)
}

View File

@@ -30,19 +30,17 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"sigs.k8s.io/controller-runtime/pkg/builder"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/features"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/util/kube"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
const (
@@ -285,33 +283,18 @@ func (b *backupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
backupSyncReconcilePeriod,
kube.PeriodicalEnqueueSourceOption{
OrderFunc: backupSyncSourceOrderFunc,
FilterFuncs: []func(object client.Object) bool{
func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return b.locationFilterFunc(location)
},
},
},
)
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return b.locationFilterFunc(location)
})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.BackupStorageLocation{}).
// Filter all BSL events, because this controller is supposed to run periodically, not by event.
WithEventFilter(predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool {
return false
},
UpdateFunc: func(ue event.UpdateEvent) bool {
return false
},
DeleteFunc: func(de event.DeleteEvent) bool {
return false
},
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
}).
Watches(backupSyncSource, nil).
For(&velerov1api.BackupStorageLocation{}, builder.WithPredicates(kube.FalsePredicate{})).
Watches(backupSyncSource, nil, builder.WithPredicates(gp)).
Complete(b)
}

View File

@@ -25,6 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -75,8 +76,7 @@ func NewGCReconciler(
func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.Backup{}).
WithEventFilter(predicate.Funcs{
For(&velerov1api.Backup{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
return false
},
@@ -86,7 +86,7 @@ func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error {
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
}).
})).
Watches(s, nil).
Complete(c)
}

View File

@@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
bld "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@@ -67,7 +68,16 @@ func NewScheduleReconciler(
func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1.Schedule{}).
// global predicate, works for both For and Watch
WithEventFilter(kube.NewAllEventPredicate(func(obj client.Object) bool {
schedule := obj.(*velerov1.Schedule)
if pause := schedule.Spec.Paused; pause {
c.logger.Infof("schedule %s is paused, skip", schedule.Name)
return false
}
return true
})).
For(&velerov1.Schedule{}, bld.WithPredicates(kube.SpecChangePredicate{})).
Watches(s, nil).
Complete(c)
}
@@ -89,13 +99,6 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, errors.Wrapf(err, "error getting schedule %s", req.String())
}
if schedule.Status.Phase != "" &&
schedule.Status.Phase != velerov1.SchedulePhaseNew &&
schedule.Status.Phase != velerov1.SchedulePhaseEnabled {
log.Debugf("the schedule phase is %s, isn't %s or %s, skip", schedule.Status.Phase, velerov1.SchedulePhaseNew, velerov1.SchedulePhaseEnabled)
return ctrl.Result{}, nil
}
c.metrics.InitSchedule(schedule.Name)
original := schedule.DeepCopy()
@@ -124,7 +127,8 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}
// check for the schedule being due to run, and submit a Backup if so
// check for the schedule being due to run, and submit a Backup if so.
// As the schedule must be validated before checking whether it's due, we cannot put the checking log in Predicate
if err := c.submitBackupIfDue(ctx, schedule, cronSchedule); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error running submitBackupIfDue for schedule %s", req.String())
}