Support pause/unpause schedules

Support pause/unpause schedule

Fixes #2363

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
This commit is contained in:
Wenkai Yin(尹文开)
2022-09-02 10:59:09 +08:00
parent 100d6b4430
commit 4b9dbfa416
21 changed files with 440 additions and 89 deletions

View File

@@ -0,0 +1 @@
Support pause/unpause schedules

View File

@@ -32,6 +32,9 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .spec.paused
name: Paused
type: boolean
name: v1
schema:
openAPIV3Schema:
@@ -53,6 +56,9 @@ spec:
spec:
description: ScheduleSpec defines the specification for a Velero schedule
properties:
paused:
description: Paused specifies whether the schedule is paused or not
type: boolean
schedule:
description: Schedule is a Cron expression defining when to run the
Backup.

File diff suppressed because one or more lines are too long

View File

@@ -38,6 +38,10 @@ type ScheduleSpec struct {
// +optional
// +nullable
UseOwnerReferencesInBackup *bool `json:"useOwnerReferencesInBackup,omitempty"`
// Paused specifies whether the schedule is paused or not
// +optional
Paused bool `json:"paused,omitempty"`
}
// SchedulePhase is a string representation of the lifecycle phase
@@ -87,6 +91,7 @@ type ScheduleStatus struct {
// +kubebuilder:printcolumn:name="Schedule",type="string",JSONPath=".spec.schedule",description="A Cron expression defining when to run the Backup"
// +kubebuilder:printcolumn:name="LastBackup",type="date",JSONPath=".status.lastBackup",description="The last time a Backup was run for this schedule"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="Paused",type="boolean",JSONPath=".spec.paused"
// Schedule is a Velero resource that represents a pre-scheduled or
// periodic Backup that should be run.

View File

@@ -27,24 +27,20 @@ import (
"github.com/spf13/pflag"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/flag"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
)
// DeleteOptions contains parameters used for deleting a restore.
type DeleteOptions struct {
Names []string
all bool
Selector flag.LabelSelector
Confirm bool
Client clientset.Interface
Namespace string
singularTypeName string
*SelectOptions
Confirm bool
Client clientset.Interface
Namespace string
}
func NewDeleteOptions(singularTypeName string) *DeleteOptions {
o := &DeleteOptions{}
o.singularTypeName = singularTypeName
o.SelectOptions = NewSelectOptions("delete", singularTypeName)
return o
}
@@ -56,8 +52,7 @@ func (o *DeleteOptions) Complete(f client.Factory, args []string) error {
return err
}
o.Client = client
o.Names = args
return nil
return o.SelectOptions.Complete(args)
}
// Validate validates the fields of the DeleteOptions struct.
@@ -65,23 +60,14 @@ func (o *DeleteOptions) Validate(c *cobra.Command, f client.Factory, args []stri
if o.Client == nil {
return errors.New("Velero client is not set; unable to proceed")
}
var (
hasNames = len(o.Names) > 0
hasAll = o.all
hasSelector = o.Selector.LabelSelector != nil
)
if !xor(hasNames, hasAll, hasSelector) {
return errors.New("you must specify exactly one of: specific " + o.singularTypeName + " name(s), the --all flag, or the --selector flag")
}
return nil
return o.SelectOptions.Validate()
}
// BindFlags binds options for this command to flags.
func (o *DeleteOptions) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.Confirm, "confirm", o.Confirm, "Confirm deletion")
flags.BoolVar(&o.all, "all", o.all, "Delete all "+o.singularTypeName+"s")
flags.VarP(&o.Selector, "selector", "l", "Delete all "+o.singularTypeName+"s matching this label selector.")
o.SelectOptions.BindFlags(flags)
}
// GetConfirmation ensures that the user confirms the action before proceeding.

View File

@@ -82,6 +82,7 @@ type CreateOptions struct {
BackupOptions *backup.CreateOptions
Schedule string
UseOwnerReferencesInBackup bool
Paused bool
labelSelector *metav1.LabelSelector
}
@@ -96,6 +97,7 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
o.BackupOptions.BindFlags(flags)
flags.StringVar(&o.Schedule, "schedule", o.Schedule, "A cron expression specifying a recurring schedule for this backup to run")
flags.BoolVar(&o.UseOwnerReferencesInBackup, "use-owner-references-in-backup", o.UseOwnerReferencesInBackup, "Specifies whether to use OwnerReferences on backups created by this Schedule. Notice: if set to true, when schedule is deleted, backups will be deleted too.")
flags.BoolVar(&o.Paused, "paused", o.Paused, "Specifies whether the newly created schedule is paused or not.")
}
func (o *CreateOptions) Validate(c *cobra.Command, args []string, f client.Factory) error {
@@ -149,6 +151,7 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
},
Schedule: o.Schedule,
UseOwnerReferencesInBackup: &o.UseOwnerReferencesInBackup,
Paused: o.Paused,
},
}

View File

@@ -0,0 +1,122 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedule
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd"
"github.com/vmware-tanzu/velero/pkg/cmd/cli"
)
// NewPauseCommand creates the command for pause
func NewPauseCommand(f client.Factory, use string) *cobra.Command {
o := cli.NewSelectOptions("pause", "schedule")
c := &cobra.Command{
Use: use,
Short: "Pause schedules",
Example: ` # Pause a schedule named "schedule-1".
velero schedule pause schedule-1
# Pause schedules named "schedule-1" and "schedule-2".
velero schedule pause schedule-1 schedule-2
# Pause all schedules labelled with "foo=bar".
velero schedule pause --selector foo=bar
# Pause all schedules.
velero schedule pause --all`,
Run: func(c *cobra.Command, args []string) {
cmd.CheckError(o.Complete(args))
cmd.CheckError(o.Validate())
cmd.CheckError(runPause(f, o, true))
},
}
o.BindFlags(c.Flags())
return c
}
func runPause(f client.Factory, o *cli.SelectOptions, paused bool) error {
client, err := f.Client()
if err != nil {
return err
}
var (
schedules []*velerov1api.Schedule
errs []error
)
switch {
case len(o.Names) > 0:
for _, name := range o.Names {
schedule, err := client.VeleroV1().Schedules(f.Namespace()).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
errs = append(errs, errors.WithStack(err))
continue
}
schedules = append(schedules, schedule)
}
default:
selector := labels.Everything().String()
if o.Selector.LabelSelector != nil {
selector = o.Selector.String()
}
res, err := client.VeleroV1().Schedules(f.Namespace()).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
if err != nil {
errs = append(errs, errors.WithStack(err))
}
for i := range res.Items {
schedules = append(schedules, &res.Items[i])
}
}
if len(schedules) == 0 {
fmt.Println("No schedules found")
return nil
}
msg := "paused"
if !paused {
msg = "unpaused"
}
for _, schedule := range schedules {
if schedule.Spec.Paused == paused {
fmt.Printf("Schedule %s is already %s, skip\n", schedule.Name, msg)
continue
}
schedule.Spec.Paused = paused
if _, err := client.VeleroV1().Schedules(schedule.Namespace).Update(context.TODO(), schedule, metav1.UpdateOptions{}); err != nil {
return errors.Wrapf(err, "failed to update schedule %s", schedule.Name)
}
fmt.Printf("Schedule %s %s successfully\n", schedule.Name, msg)
}
return kubeerrs.NewAggregate(errs)
}

View File

@@ -34,6 +34,8 @@ func NewCommand(f client.Factory) *cobra.Command {
NewGetCommand(f, "get"),
NewDescribeCommand(f, "describe"),
NewDeleteCommand(f, "delete"),
NewPauseCommand(f, "pause"),
NewUnpauseCommand(f, "unpause"),
)
return c

View File

@@ -0,0 +1,55 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedule
import (
"github.com/spf13/cobra"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd"
"github.com/vmware-tanzu/velero/pkg/cmd/cli"
)
// NewUnpauseCommand creates the command for unpause
func NewUnpauseCommand(f client.Factory, use string) *cobra.Command {
o := cli.NewSelectOptions("pause", "schedule")
c := &cobra.Command{
Use: use,
Short: "Unpause schedules",
Example: ` # Unpause a schedule named "schedule-1".
velero schedule unpause schedule-1
# Unpause schedules named "schedule-1" and "schedule-2".
velero schedule unpause schedule-1 schedule-2
# Unpause all schedules labelled with "foo=bar".
velero schedule unpause --selector foo=bar
# Unpause all schedules.
velero schedule unpause --all`,
Run: func(c *cobra.Command, args []string) {
cmd.CheckError(o.Complete(args))
cmd.CheckError(o.Validate())
cmd.CheckError(runPause(f, o, false))
},
}
o.BindFlags(c.Flags())
return c
}

View File

@@ -0,0 +1,69 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cli
import (
"errors"
"strings"
"github.com/spf13/pflag"
"github.com/vmware-tanzu/velero/pkg/cmd/util/flag"
)
// SelectOptions defines the options for selecting resources
type SelectOptions struct {
Names []string
All bool
Selector flag.LabelSelector
CMD string
SingularTypeName string
}
// NewSelectOptions creates a new option for selector
func NewSelectOptions(cmd, singularTypeName string) *SelectOptions {
return &SelectOptions{
CMD: cmd,
SingularTypeName: singularTypeName,
}
}
// Complete fills in the correct values for all the options.
func (o *SelectOptions) Complete(args []string) error {
o.Names = args
return nil
}
// Validate validates the fields of the SelectOptions struct.
func (o *SelectOptions) Validate() error {
var (
hasNames = len(o.Names) > 0
hasAll = o.All
hasSelector = o.Selector.LabelSelector != nil
)
if !xor(hasNames, hasAll, hasSelector) {
return errors.New("you must specify exactly one of: specific " + o.SingularTypeName + " name(s), the --all flag, or the --selector flag")
}
return nil
}
// BindFlags binds options for this command to flags.
func (o *SelectOptions) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.All, "all", o.All, strings.Title(o.CMD)+" all "+o.SingularTypeName+"s")
flags.VarP(&o.Selector, "selector", "l", strings.Title(o.CMD)+" all "+o.SingularTypeName+"s matching this label selector.")
}

View File

@@ -0,0 +1,45 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cli
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmware-tanzu/velero/pkg/cmd/util/flag"
)
func TestCompleteOfSelectOption(t *testing.T) {
option := &SelectOptions{}
args := []string{"arg1", "arg2"}
require.Nil(t, option.Complete(args))
assert.Equal(t, args, option.Names)
}
func TestValidateOfSelectOption(t *testing.T) {
option := &SelectOptions{
Names: nil,
Selector: flag.LabelSelector{},
All: false,
}
assert.NotNil(t, option.Validate())
option.All = true
assert.Nil(t, option.Validate())
}

View File

@@ -52,6 +52,9 @@ func DescribeSchedule(schedule *v1.Schedule) string {
}
}
d.Println()
d.Printf("Paused:\t%t\n", schedule.Spec.Paused)
d.Println()
DescribeScheduleSpec(d, schedule.Spec)

View File

@@ -36,6 +36,7 @@ var (
{Name: "Backup TTL"},
{Name: "Last Backup"},
{Name: "Selector"},
{Name: "Paused"},
}
)
@@ -71,6 +72,7 @@ func printSchedule(schedule *v1.Schedule) []metav1.TableRow {
schedule.Spec.Template.TTL.Duration,
humanReadableTimeFromNow(lastBackupTime),
metav1.FormatLabelSelector(schedule.Spec.Template.LabelSelector),
schedule.Spec.Paused,
)
return []metav1.TableRow{row}

View File

@@ -207,18 +207,15 @@ func (r *backupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) err
mgr.GetClient(),
&velerov1api.BackupStorageLocationList{},
bslValidationEnqueuePeriod,
kube.PeriodicalEnqueueSourceOption{
FilterFuncs: []func(object client.Object) bool{
func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation))
},
},
},
kube.PeriodicalEnqueueSourceOption{},
)
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.defaultBackupLocationInfo.ServerValidationFrequency, r.log.WithField("controller", BackupStorageLocation))
})
return ctrl.NewControllerManagedBy(mgr).
// As the "status.LastValidationTime" field is always updated, this triggers new reconciling process, skip the update event that include no spec change to avoid the reconcile loop
For(&velerov1api.BackupStorageLocation{}, builder.WithPredicates(kube.SpecChangePredicate{})).
Watches(g, nil).
Watches(g, nil, builder.WithPredicates(gp)).
Complete(r)
}

View File

@@ -31,19 +31,17 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"sigs.k8s.io/controller-runtime/pkg/builder"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/features"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/util/kube"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
const (
@@ -286,33 +284,18 @@ func (b *backupSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
backupSyncReconcilePeriod,
kube.PeriodicalEnqueueSourceOption{
OrderFunc: backupSyncSourceOrderFunc,
FilterFuncs: []func(object client.Object) bool{
func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return b.locationFilterFunc(location)
},
},
},
)
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
location := object.(*velerov1api.BackupStorageLocation)
return b.locationFilterFunc(location)
})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.BackupStorageLocation{}).
// Filter all BSL events, because this controller is supposed to run periodically, not by event.
WithEventFilter(predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool {
return false
},
UpdateFunc: func(ue event.UpdateEvent) bool {
return false
},
DeleteFunc: func(de event.DeleteEvent) bool {
return false
},
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
}).
Watches(backupSyncSource, nil).
For(&velerov1api.BackupStorageLocation{}, builder.WithPredicates(kube.FalsePredicate{})).
Watches(backupSyncSource, nil, builder.WithPredicates(gp)).
Complete(b)
}

View File

@@ -25,6 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -75,8 +76,7 @@ func NewGCReconciler(
func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, c.frequency, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.Backup{}).
WithEventFilter(predicate.Funcs{
For(&velerov1api.Backup{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
return false
},
@@ -86,7 +86,7 @@ func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error {
GenericFunc: func(ge event.GenericEvent) bool {
return false
},
}).
})).
Watches(s, nil).
Complete(c)
}

View File

@@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
bld "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@@ -67,7 +68,16 @@ func NewScheduleReconciler(
func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod, kube.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1.Schedule{}).
// global predicate, works for both For and Watch
WithEventFilter(kube.NewAllEventPredicate(func(obj client.Object) bool {
schedule := obj.(*velerov1.Schedule)
if pause := schedule.Spec.Paused; pause {
c.logger.Infof("schedule %s is paused, skip", schedule.Name)
return false
}
return true
})).
For(&velerov1.Schedule{}, bld.WithPredicates(kube.SpecChangePredicate{})).
Watches(s, nil).
Complete(c)
}
@@ -89,13 +99,6 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, errors.Wrapf(err, "error getting schedule %s", req.String())
}
if schedule.Status.Phase != "" &&
schedule.Status.Phase != velerov1.SchedulePhaseNew &&
schedule.Status.Phase != velerov1.SchedulePhaseEnabled {
log.Debugf("the schedule phase is %s, isn't %s or %s, skip", schedule.Status.Phase, velerov1.SchedulePhaseNew, velerov1.SchedulePhaseEnabled)
return ctrl.Result{}, nil
}
c.metrics.InitSchedule(schedule.Name)
original := schedule.DeepCopy()
@@ -124,7 +127,8 @@ func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}
// check for the schedule being due to run, and submit a Backup if so
// check for the schedule being due to run, and submit a Backup if so.
// As the schedule must be validated before checking whether it's due, we cannot put the checking log in Predicate
if err := c.submitBackupIfDue(ctx, schedule, cronSchedule); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error running submitBackupIfDue for schedule %s", req.String())
}

View File

@@ -23,13 +23,13 @@ import (
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
@@ -61,11 +61,11 @@ type PeriodicalEnqueueSource struct {
}
type PeriodicalEnqueueSourceOption struct {
FilterFuncs []func(object client.Object) bool
OrderFunc func(objList client.ObjectList) client.ObjectList
OrderFunc func(objList client.ObjectList) client.ObjectList
}
func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, pre ...predicate.Predicate) error {
// Start enqueue items periodically. The predicates only apply to the GenericEvent
func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, predicates ...predicate.Predicate) error {
go wait.Until(func() {
p.logger.Debug("enqueueing resources ...")
if err := p.List(ctx, p.objList); err != nil {
@@ -80,19 +80,19 @@ func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHand
p.objList = p.option.OrderFunc(p.objList)
}
if err := meta.EachListItem(p.objList, func(object runtime.Object) error {
obj, ok := object.(metav1.Object)
obj, ok := object.(client.Object)
if !ok {
p.logger.Error("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String())
return nil
}
for _, filter := range p.option.FilterFuncs {
if filter != nil {
if enqueueObj := filter(object.(client.Object)); !enqueueObj {
p.logger.Debugf("skip enqueue object %s/%s due to filter function.", obj.GetNamespace(), obj.GetName())
return nil
}
event := event.GenericEvent{Object: obj}
for _, predicate := range predicates {
if !predicate.Generic(event) {
p.logger.Debugf("skip enqueue object %s/%s due to the predicate.", obj.GetNamespace(), obj.GetName())
return nil
}
}
q.Add(ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),

View File

@@ -68,7 +68,7 @@ func TestStart(t *testing.T) {
require.Equal(t, 0, queue.Len())
}
func TestFilter(t *testing.T) {
func TestPredicate(t *testing.T) {
require.Nil(t, velerov1.AddToScheme(scheme.Scheme))
ctx, cancelFunc := context.WithCancel(context.TODO())
@@ -79,15 +79,13 @@ func TestFilter(t *testing.T) {
client,
&velerov1.BackupStorageLocationList{},
1*time.Second,
PeriodicalEnqueueSourceOption{
FilterFuncs: []func(object crclient.Object) bool{func(object crclient.Object) bool {
location := object.(*velerov1.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name))
}},
},
PeriodicalEnqueueSourceOption{},
)
require.Nil(t, source.Start(ctx, nil, queue))
require.Nil(t, source.Start(ctx, nil, queue, NewGenericEventPredicate(func(object crclient.Object) bool {
location := object.(*velerov1.BackupStorageLocation)
return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name))
})))
// Should not patch a backup storage location object status phase
// if the location's validation frequency is specifically set to zero

View File

@@ -19,6 +19,7 @@ package kube
import (
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
@@ -45,3 +46,53 @@ func (SpecChangePredicate) Update(e event.UpdateEvent) bool {
newSpec := reflect.ValueOf(e.ObjectNew).Elem().FieldByName("Spec")
return !reflect.DeepEqual(oldSpec.Interface(), newSpec.Interface())
}
// NewGenericEventPredicate creates a new Predicate that checks the Generic event with the provided func
func NewGenericEventPredicate(f func(object client.Object) bool) predicate.Predicate {
return predicate.Funcs{
GenericFunc: func(event event.GenericEvent) bool {
return f(event.Object)
},
}
}
// NewAllEventPredicate creates a new Predicate that checks all the events with the provided func
func NewAllEventPredicate(f func(object client.Object) bool) predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
return f(event.Object)
},
DeleteFunc: func(event event.DeleteEvent) bool {
return f(event.Object)
},
UpdateFunc: func(event event.UpdateEvent) bool {
return f(event.ObjectNew)
},
GenericFunc: func(event event.GenericEvent) bool {
return f(event.Object)
},
}
}
// FalsePredicate always returns false for all kinds of events
type FalsePredicate struct{}
// Create always returns false
func (f FalsePredicate) Create(event.CreateEvent) bool {
return false
}
// Delete always returns false
func (f FalsePredicate) Delete(event.DeleteEvent) bool {
return false
}
// Update always returns false
func (f FalsePredicate) Update(event.UpdateEvent) bool {
return false
}
// Generic always returns false
func (f FalsePredicate) Generic(event.GenericEvent) bool {
return false
}

View File

@@ -178,3 +178,22 @@ func TestSpecChangePredicate(t *testing.T) {
})
}
}
func TestNewGenericEventPredicate(t *testing.T) {
predicate := NewGenericEventPredicate(func(object client.Object) bool {
return false
})
assert.False(t, predicate.Generic(event.GenericEvent{}))
}
func TestNewAllEventPredicate(t *testing.T) {
predicate := NewAllEventPredicate(func(object client.Object) bool {
return false
})
assert.False(t, predicate.Create(event.CreateEvent{}))
assert.False(t, predicate.Update(event.UpdateEvent{}))
assert.False(t, predicate.Delete(event.DeleteEvent{}))
assert.False(t, predicate.Generic(event.GenericEvent{}))
}