diff --git a/changelogs/unreleased/4894-jxun b/changelogs/unreleased/4894-jxun new file mode 100644 index 000000000..dcac05202 --- /dev/null +++ b/changelogs/unreleased/4894-jxun @@ -0,0 +1 @@ +Refactor BSL controller with periodical enqueue source \ No newline at end of file diff --git a/pkg/controller/backup_storage_location_controller.go b/pkg/controller/backup_storage_location_controller.go index e6abd9b35..627247be1 100644 --- a/pkg/controller/backup_storage_location_controller.go +++ b/pkg/controller/backup_storage_location_controller.go @@ -29,11 +29,18 @@ import ( "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/vmware-tanzu/velero/internal/storage" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/persistence" "github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +const ( + backupStorageLocationSyncPeriod = 1 * time.Minute ) // BackupStorageLocationReconciler reconciles a BackupStorageLocation object @@ -53,96 +60,92 @@ type BackupStorageLocationReconciler struct { // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations/status,verbs=get;update;patch func (r *BackupStorageLocationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithField("controller", BackupStorageLocation) + var unavailableErrors []string + var location velerov1api.BackupStorageLocation - log.Debug("Validating availability of backup storage locations.") + log := r.Log.WithField("controller", BackupStorageLocation).WithField(BackupStorageLocation, req.NamespacedName.String()) + log.Debug("Validating availability of BackupStorageLocation") locationList, err := storage.ListBackupStorageLocations(r.Ctx, r.Client, req.Namespace) if err != nil { - log.WithError(err).Error("No backup storage locations found, at least one is required") - return ctrl.Result{}, err + log.WithError(err).Error("No BackupStorageLocations found, at least one is required") + return ctrl.Result{}, nil } pluginManager := r.NewPluginManager(log) defer pluginManager.CleanupClients() var defaultFound bool - for _, location := range locationList.Items { - if location.Spec.Default { + for _, bsl := range locationList.Items { + if bsl.Spec.Default { defaultFound = true - break + } + if bsl.Name == req.Name && bsl.Namespace == req.Namespace { + location = bsl } } - var unavailableErrors []string - var anyVerified bool - for i := range locationList.Items { - location := &locationList.Items[i] - isDefault := location.Spec.Default - log := r.Log.WithField("controller", BackupStorageLocation).WithField(BackupStorageLocation, location.Name) + if location.Name == "" || location.Namespace == "" { + log.WithError(err).Error("BackupStorageLocation is not found") + return ctrl.Result{}, nil + } - // TODO(2.0) remove this check since the server default will be deprecated - if !defaultFound && location.Name == r.DefaultBackupLocationInfo.StorageLocation { - // For backward-compatible, to configure the backup storage location as the default if - // none of the BSLs be marked as the default and the BSL name matches against the - // "velero server --default-backup-storage-location". - isDefault = true - defaultFound = true + isDefault := location.Spec.Default + + // TODO(2.0) remove this check since the server default will be deprecated + if !defaultFound && location.Name == r.DefaultBackupLocationInfo.StorageLocation { + // For backward-compatible, to configure the backup storage location as the default if + // none of the BSLs be marked as the default and the BSL name matches against the + // "velero server --default-backup-storage-location". + isDefault = true + defaultFound = true + } + + func() { + // Initialize the patch helper. + patchHelper, err := patch.NewHelper(&location, r.Client) + if err != nil { + log.WithError(err).Error("Error getting a patch helper to update BackupStorageLocation") + return } - - if !storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.DefaultBackupLocationInfo.ServerValidationFrequency, log) { - log.Debug("Validation not required, skipping...") - continue - } - - anyVerified = true - - func() { - // Initialize the patch helper. - patchHelper, err := patch.NewHelper(location, r.Client) + defer func() { + location.Status.LastValidationTime = &metav1.Time{Time: time.Now().UTC()} if err != nil { - log.WithError(err).Error("Error getting a patch helper to update this resource") - return + log.Info("BackupStorageLocation is invalid, marking as unavailable") + err = errors.Wrapf(err, "BackupStorageLocation %q is unavailable", location.Name) + unavailableErrors = append(unavailableErrors, err.Error()) + location.Status.Phase = velerov1api.BackupStorageLocationPhaseUnavailable + location.Status.Message = err.Error() + } else { + log.Info("BackupStorageLocations is valid, marking as available") + location.Status.Phase = velerov1api.BackupStorageLocationPhaseAvailable + location.Status.Message = "" } - defer func() { - location.Status.LastValidationTime = &metav1.Time{Time: time.Now().UTC()} - if err != nil { - log.Info("Backup storage location is invalid, marking as unavailable") - err = errors.Wrapf(err, "Backup storage location %q is unavailable", location.Name) - unavailableErrors = append(unavailableErrors, err.Error()) - location.Status.Phase = velerov1api.BackupStorageLocationPhaseUnavailable - location.Status.Message = err.Error() - } else { - log.Info("Backup storage location valid, marking as available") - location.Status.Phase = velerov1api.BackupStorageLocationPhaseAvailable - location.Status.Message = "" - } - if err := patchHelper.Patch(r.Ctx, location); err != nil { - log.WithError(err).Error("Error updating backup storage location phase") - } - }() - - backupStore, err := r.BackupStoreGetter.Get(location, pluginManager, log) - if err != nil { - err = errors.Wrapf(err, "Error getting a backup store") - return + if err := patchHelper.Patch(r.Ctx, &location); err != nil { + log.WithError(err).Error("Error updating BackupStorageLocation phase") } - - // updates the default backup location - location.Spec.Default = isDefault - - log.Info("Validating backup storage location") - err = backupStore.IsValid() }() - } - if !anyVerified { - log.Debug("No backup storage locations needed to be validated") - } + backupStore, err := r.BackupStoreGetter.Get(&location, pluginManager, log) + if err != nil { + log.WithError(err).Error("Error getting a backup store") + return + } + + log.Info("Validating BackupStorageLocation") + err = backupStore.IsValid() + if err != nil { + log.WithError(err).Error("fail to validate backup store") + return + } + + // updates the default backup location + location.Spec.Default = isDefault + }() r.logReconciledPhase(defaultFound, locationList, unavailableErrors) - return ctrl.Result{Requeue: true}, nil + return ctrl.Result{}, nil } func (r *BackupStorageLocationReconciler) logReconciledPhase(defaultFound bool, locationList velerov1api.BackupStorageLocationList, errs []string) { @@ -169,21 +172,48 @@ func (r *BackupStorageLocationReconciler) logReconciledPhase(defaultFound bool, if numUnavailable+numUnknown == len(locationList.Items) { // no available BSL if len(errs) > 0 { - log.Errorf("Current backup storage locations available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; ")) + log.Errorf("Current BackupStorageLocations available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; ")) } else { - log.Errorf("Current backup storage locations available/unavailable/unknown: %v/%v/%v)", numAvailable, numUnavailable, numUnknown) + log.Errorf("Current BackupStorageLocations available/unavailable/unknown: %v/%v/%v)", numAvailable, numUnavailable, numUnknown) } } else if numUnavailable > 0 { // some but not all BSL unavailable - log.Warnf("Unavailable backup storage locations detected: available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; ")) + log.Warnf("Unavailable BackupStorageLocations detected: available/unavailable/unknown: %v/%v/%v, %s)", numAvailable, numUnavailable, numUnknown, strings.Join(errs, "; ")) } if !defaultFound { - log.Warn("There is no existing backup storage location set as default. Please see `velero backup-location -h` for options.") + log.Warn("There is no existing BackupStorageLocation set as default. Please see `velero backup-location -h` for options.") } } func (r *BackupStorageLocationReconciler) SetupWithManager(mgr ctrl.Manager) error { + g := kube.NewPeriodicalEnqueueSource( + r.Log, + mgr.GetClient(), + &velerov1api.BackupStorageLocationList{}, + backupStorageLocationSyncPeriod, + // Add filter function to enqueue BSL per ValidationFrequency setting. + func(object client.Object) bool { + location := object.(*velerov1api.BackupStorageLocation) + return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, r.DefaultBackupLocationInfo.ServerValidationFrequency, r.Log.WithField("controller", BackupStorageLocation)) + }, + ) return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.BackupStorageLocation{}). + // Handle BSL's creation event and spec update event to let changed BSL got validation immediately. + WithEventFilter(predicate.Funcs{ + CreateFunc: func(ce event.CreateEvent) bool { + return true + }, + UpdateFunc: func(ue event.UpdateEvent) bool { + return ue.ObjectNew.GetGeneration() != ue.ObjectOld.GetGeneration() + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + }). + Watches(g, nil). Complete(r) } diff --git a/pkg/controller/backup_storage_location_controller_test.go b/pkg/controller/backup_storage_location_controller_test.go index 82a6cc241..75ad691a2 100644 --- a/pkg/controller/backup_storage_location_controller_test.go +++ b/pkg/controller/backup_storage_location_controller_test.go @@ -81,7 +81,7 @@ var _ = Describe("Backup Storage Location Reconciler", func() { Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) r := BackupStorageLocationReconciler{ Ctx: ctx, - Client: fake.NewFakeClientWithScheme(scheme.Scheme, locations), + Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(), DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ StorageLocation: "location-1", ServerValidationFrequency: 0, @@ -91,18 +91,17 @@ var _ = Describe("Backup Storage Location Reconciler", func() { Log: velerotest.NewLogger(), } - actualResult, err := r.Reconcile(ctx, ctrl.Request{ - NamespacedName: types.NamespacedName{Namespace: "ns-1"}, - }) - - Expect(actualResult).To(BeEquivalentTo(ctrl.Result{Requeue: true})) - Expect(err).To(BeNil()) - // Assertions for i, location := range locations.Items { + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: location.Namespace, Name: location.Name}, + }) + Expect(actualResult).To(BeEquivalentTo(ctrl.Result{})) + Expect(err).To(BeNil()) + key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace} instance := &velerov1api.BackupStorageLocation{} - err := r.Client.Get(ctx, key, instance) + err = r.Client.Get(ctx, key, instance) Expect(err).To(BeNil()) Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault)) Expect(instance.Status.Phase).To(BeIdenticalTo(tests[i].expectedPhase)) @@ -147,7 +146,7 @@ var _ = Describe("Backup Storage Location Reconciler", func() { Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) r := BackupStorageLocationReconciler{ Ctx: ctx, - Client: fake.NewFakeClientWithScheme(scheme.Scheme, locations), + Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(locations).Build(), DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ StorageLocation: "default", ServerValidationFrequency: 0, @@ -157,91 +156,19 @@ var _ = Describe("Backup Storage Location Reconciler", func() { Log: velerotest.NewLogger(), } - actualResult, err := r.Reconcile(ctx, ctrl.Request{ - NamespacedName: types.NamespacedName{Namespace: "ns-1"}, - }) - - Expect(actualResult).To(BeEquivalentTo(ctrl.Result{Requeue: true})) - Expect(err).To(BeNil()) - // Assertions for i, location := range locations.Items { + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: location.Namespace, Name: location.Name}, + }) + Expect(actualResult).To(BeEquivalentTo(ctrl.Result{})) + Expect(err).To(BeNil()) + key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace} instance := &velerov1api.BackupStorageLocation{} - err := r.Client.Get(ctx, key, instance) + err = r.Client.Get(ctx, key, instance) Expect(err).To(BeNil()) Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault)) } }) - - It("Should not patch a backup storage location object status phase if the location's validation frequency is specifically set to zero", func() { - tests := []struct { - backupLocation *velerov1api.BackupStorageLocation - isValidError error - expectedIsDefault bool - expectedPhase velerov1api.BackupStorageLocationPhase - wantErr bool - }{ - { - backupLocation: builder.ForBackupStorageLocation("ns-1", "location-1").ValidationFrequency(0).LastValidationTime(time.Now()).Result(), - isValidError: nil, - expectedIsDefault: false, - expectedPhase: "", - wantErr: false, - }, - { - backupLocation: builder.ForBackupStorageLocation("ns-1", "location-2").ValidationFrequency(0).LastValidationTime(time.Now()).Result(), - isValidError: nil, - expectedIsDefault: false, - expectedPhase: "", - wantErr: false, - }, - } - - // Setup - var ( - pluginManager = &pluginmocks.Manager{} - backupStores = make(map[string]*persistencemocks.BackupStore) - ) - pluginManager.On("CleanupClients").Return(nil) - - locations := new(velerov1api.BackupStorageLocationList) - for i, test := range tests { - location := test.backupLocation - locations.Items = append(locations.Items, *location) - backupStores[location.Name] = &persistencemocks.BackupStore{} - backupStores[location.Name].On("IsValid").Return(tests[i].isValidError) - } - - // Setup reconciler - Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) - r := BackupStorageLocationReconciler{ - Ctx: ctx, - Client: fake.NewFakeClientWithScheme(scheme.Scheme, locations), - DefaultBackupLocationInfo: storage.DefaultBackupLocationInfo{ - StorageLocation: "default", - ServerValidationFrequency: 0, - }, - NewPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager }, - BackupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores), - Log: velerotest.NewLogger(), - } - - actualResult, err := r.Reconcile(ctx, ctrl.Request{ - NamespacedName: types.NamespacedName{Namespace: "ns-1"}, - }) - - Expect(actualResult).To(BeEquivalentTo(ctrl.Result{Requeue: true})) - Expect(err).To(BeNil()) - - // Assertions - for i, location := range locations.Items { - key := client.ObjectKey{Name: location.Name, Namespace: location.Namespace} - instance := &velerov1api.BackupStorageLocation{} - err := r.Client.Get(ctx, key, instance) - Expect(err).To(BeNil()) - Expect(instance.Spec.Default).To(BeIdenticalTo(tests[i].expectedIsDefault)) - Expect(instance.Status.Phase).To(BeIdenticalTo(tests[i].expectedPhase)) - } - }) }) diff --git a/pkg/util/kube/periodical_enqueue_source.go b/pkg/util/kube/periodical_enqueue_source.go index bb89295b4..20b658c61 100644 --- a/pkg/util/kube/periodical_enqueue_source.go +++ b/pkg/util/kube/periodical_enqueue_source.go @@ -34,12 +34,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) -func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration) *PeriodicalEnqueueSource { +func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, objList client.ObjectList, period time.Duration, filters ...func(object client.Object) bool) *PeriodicalEnqueueSource { return &PeriodicalEnqueueSource{ - logger: logger.WithField("resource", reflect.TypeOf(objList).String()), - Client: client, - objList: objList, - period: period, + logger: logger.WithField("resource", reflect.TypeOf(objList).String()), + Client: client, + objList: objList, + period: period, + filterFuncs: filters, } } @@ -48,9 +49,10 @@ func NewPeriodicalEnqueueSource(logger logrus.FieldLogger, client client.Client, // the reconcile logic periodically type PeriodicalEnqueueSource struct { client.Client - logger logrus.FieldLogger - objList client.ObjectList - period time.Duration + logger logrus.FieldLogger + objList client.ObjectList + period time.Duration + filterFuncs []func(object client.Object) bool } func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, pre ...predicate.Predicate) error { @@ -70,6 +72,14 @@ func (p *PeriodicalEnqueueSource) Start(ctx context.Context, h handler.EventHand p.logger.Error("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String()) return nil } + for _, filter := range p.filterFuncs { + if filter != nil { + if enqueueObj := filter(object.(client.Object)); !enqueueObj { + p.logger.Debugf("skip enqueue object %s/%s due to filter function.", obj.GetNamespace(), obj.GetName()) + return nil + } + } + } q.Add(ctrl.Request{ NamespacedName: types.NamespacedName{ Namespace: obj.GetNamespace(), diff --git a/pkg/util/kube/periodical_enqueue_source_test.go b/pkg/util/kube/periodical_enqueue_source_test.go index 2264729f7..362153347 100644 --- a/pkg/util/kube/periodical_enqueue_source_test.go +++ b/pkg/util/kube/periodical_enqueue_source_test.go @@ -26,8 +26,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" + crclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/vmware-tanzu/velero/internal/storage" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" ) @@ -43,7 +45,7 @@ func TestStart(t *testing.T) { // no resources time.Sleep(1 * time.Second) - require.Equal(t, queue.Len(), 0) + require.Equal(t, 0, queue.Len()) // contain one resource require.Nil(t, client.Create(ctx, &velerov1.Schedule{ @@ -52,13 +54,53 @@ func TestStart(t *testing.T) { }, })) time.Sleep(2 * time.Second) - require.Equal(t, queue.Len(), 1) + require.Equal(t, 1, queue.Len()) // context canceled, the enqueue source shouldn't run anymore item, _ := queue.Get() queue.Forget(item) - require.Equal(t, queue.Len(), 0) + require.Equal(t, 0, queue.Len()) cancelFunc() time.Sleep(2 * time.Second) - require.Equal(t, queue.Len(), 0) + require.Equal(t, 0, queue.Len()) +} + +func TestFilter(t *testing.T) { + require.Nil(t, velerov1.AddToScheme(scheme.Scheme)) + + ctx, cancelFunc := context.WithCancel(context.TODO()) + client := (&fake.ClientBuilder{}).Build() + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()) + source := NewPeriodicalEnqueueSource( + logrus.WithContext(ctx), + client, + &velerov1.BackupStorageLocationList{}, + 1*time.Second, + func(object crclient.Object) bool { + location := object.(*velerov1.BackupStorageLocation) + return storage.IsReadyToValidate(location.Spec.ValidationFrequency, location.Status.LastValidationTime, 1*time.Minute, logrus.WithContext(ctx).WithField("BackupStorageLocation", location.Name)) + }, + ) + + require.Nil(t, source.Start(ctx, nil, queue)) + + // Should not patch a backup storage location object status phase + // if the location's validation frequency is specifically set to zero + require.Nil(t, client.Create(ctx, &velerov1.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "location1", + Namespace: "default", + }, + Spec: velerov1.BackupStorageLocationSpec{ + ValidationFrequency: &metav1.Duration{Duration: 0}, + }, + Status: velerov1.BackupStorageLocationStatus{ + LastValidationTime: &metav1.Time{Time: time.Now()}, + }, + })) + time.Sleep(2 * time.Second) + + require.Equal(t, 0, queue.Len()) + + cancelFunc() }