diff --git a/changelogs/unreleased/5215-allenxu404 b/changelogs/unreleased/5215-allenxu404 new file mode 100644 index 000000000..48275bb25 --- /dev/null +++ b/changelogs/unreleased/5215-allenxu404 @@ -0,0 +1 @@ +Refactor GCController with kubebuilder \ No newline at end of file diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 96dc264e8..c05dc75ef 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -674,22 +674,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } - gcControllerRunInfo := func() controllerRunInfo { - gcController := controller.NewGCController( - s.logger, - s.sharedInformerFactory.Velero().V1().Backups(), - s.sharedInformerFactory.Velero().V1().DeleteBackupRequests().Lister(), - s.veleroClient.VeleroV1(), - s.mgr.GetClient(), - s.config.garbageCollectionFrequency, - ) - - return controllerRunInfo{ - controller: gcController, - numWorkers: defaultControllerWorkers, - } - } - restoreControllerRunInfo := func() controllerRunInfo { restorer, err := restore.NewKubernetesRestorer( s.veleroClient.VeleroV1(), @@ -731,15 +715,15 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } enabledControllers := map[string]func() controllerRunInfo{ - controller.BackupSync: backupSyncControllerRunInfo, - controller.Backup: backupControllerRunInfo, - controller.GarbageCollection: gcControllerRunInfo, - controller.Restore: restoreControllerRunInfo, + controller.BackupSync: backupSyncControllerRunInfo, + controller.Backup: backupControllerRunInfo, + controller.Restore: restoreControllerRunInfo, } // Note: all runtime type controllers that can be disabled are grouped separately, below: enabledRuntimeControllers := make(map[string]struct{}) enabledRuntimeControllers[controller.ServerStatusRequest] = struct{}{} enabledRuntimeControllers[controller.DownloadRequest] = struct{}{} + enabledRuntimeControllers[controller.GarbageCollection] = struct{}{} if s.config.restoreOnly { s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers") @@ -851,6 +835,13 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } } + if _, ok := enabledRuntimeControllers[controller.GarbageCollection]; ok { + r := controller.NewGCReconciler(s.logger, s.mgr.GetClient()) + if err := r.SetupWithManager(s.mgr); err != nil { + s.logger.Fatal(err, "unable to create controller", "controller", controller.GarbageCollection) + } + } + // TODO(2.0): presuming all controllers and resources are converted to runtime-controller // by v2.0, the block from this line and including the `s.mgr.Start() will be // deprecated, since the manager auto-starts all the caches. Until then, we need to start the diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index b8297aa2a..c12ba45d9 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -23,19 +23,16 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/client-go/tools/cache" - + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" pkgbackup "github.com/vmware-tanzu/velero/pkg/backup" - velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1" - velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1" - velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1" "github.com/vmware-tanzu/velero/pkg/label" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) const ( @@ -46,100 +43,77 @@ const ( gcFailureBSLReadOnly = "BSLReadOnly" ) -// gcController creates DeleteBackupRequests for expired backups. -type gcController struct { - *genericController - - backupLister velerov1listers.BackupLister - deleteBackupRequestLister velerov1listers.DeleteBackupRequestLister - deleteBackupRequestClient velerov1client.DeleteBackupRequestsGetter - kbClient client.Client - frequency time.Duration - - clock clock.Clock +// gcReconciler creates DeleteBackupRequests for expired backups. +type gcReconciler struct { + client.Client + logger logrus.FieldLogger + clock clock.Clock } -// NewGCController constructs a new gcController. -func NewGCController( +// NewGCReconciler constructs a new gcReconciler. +func NewGCReconciler( logger logrus.FieldLogger, - backupInformer velerov1informers.BackupInformer, - deleteBackupRequestLister velerov1listers.DeleteBackupRequestLister, - deleteBackupRequestClient velerov1client.DeleteBackupRequestsGetter, - kbClient client.Client, - frequency time.Duration, -) Interface { - c := &gcController{ - genericController: newGenericController(GarbageCollection, logger), - clock: clock.RealClock{}, - backupLister: backupInformer.Lister(), - deleteBackupRequestLister: deleteBackupRequestLister, - deleteBackupRequestClient: deleteBackupRequestClient, - kbClient: kbClient, - } - - c.syncHandler = c.processQueueItem - c.resyncPeriod = frequency - if c.resyncPeriod <= 0 { - c.resyncPeriod = defaultGCFrequency - } - logger.Infof("Garbage collection frequency: %s", c.resyncPeriod.String()) - c.resyncFunc = c.enqueueAllBackups - - backupInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueue, - UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, - }, - ) - - return c -} - -// enqueueAllBackups lists all backups from cache and enqueues all of them so we can check each one -// for expiration. -func (c *gcController) enqueueAllBackups() { - c.logger.Debug("gcController.enqueueAllBackups") - - backups, err := c.backupLister.List(labels.Everything()) - if err != nil { - c.logger.WithError(errors.WithStack(err)).Error("error listing backups") - return - } - - for _, backup := range backups { - c.enqueue(backup) + client client.Client, +) *gcReconciler { + return &gcReconciler{ + Client: client, + logger: logger, + clock: clock.RealClock{}, } } -func (c *gcController) processQueueItem(key string) error { - log := c.logger.WithField("backup", key) +// GCController only watches on CreateEvent for ensuring every new backup will be taken care of. +// Other Events will be filtered to decrease the number of reconcile call. Especially UpdateEvent must be filtered since we removed +// the backup status as the sub-resource of backup in v1.9, every change on it will be treated as UpdateEvent and trigger reconcile call. +func (c *gcReconciler) SetupWithManager(mgr ctrl.Manager) error { + s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1api.BackupList{}, defaultGCFrequency) + return ctrl.NewControllerManagedBy(mgr). + For(&velerov1api.Backup{}). + WithEventFilter(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + return false + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + }). + Watches(s, nil). + Complete(c) +} - ns, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return errors.Wrap(err, "error splitting queue key") - } +// +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=get;list;watch;update +// +kubebuilder:rbac:groups=velero.io,resources=backups/status,verbs=get +// +kubebuilder:rbac:groups=velero.io,resources=deletebackuprequests,verbs=get;list;watch;create; +// +kubebuilder:rbac:groups=velero.io,resources=deletebackuprequests/status,verbs=get +// +kubebuilder:rbac:groups=velero.io,resources=backupstoragelocations,verbs=get +func (c *gcReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := c.logger.WithField("gc backup", req.String()) + log.Debug("gcController getting backup") - backup, err := c.backupLister.Backups(ns).Get(name) - if apierrors.IsNotFound(err) { - log.Debug("Unable to find backup") - return nil - } - if err != nil { - return errors.Wrap(err, "error getting backup") + backup := &velerov1api.Backup{} + if err := c.Get(ctx, req.NamespacedName, backup); err != nil { + if apierrors.IsNotFound(err) { + log.WithError(err).Error("backup not found") + return ctrl.Result{}, nil + } + return ctrl.Result{}, errors.Wrapf(err, "error getting backup %s", req.String()) } + log.Debugf("backup: %v", backup) log = c.logger.WithFields( logrus.Fields{ - "backup": key, + "backup": req.String(), "expiration": backup.Status.Expiration, }, ) now := c.clock.Now() - if backup.Status.Expiration == nil || backup.Status.Expiration.After(now) { log.Debug("Backup has not expired yet, skipping") - return nil + return ctrl.Result{}, nil } log.Info("Backup has expired") @@ -149,8 +123,8 @@ func (c *gcController) processQueueItem(key string) error { } loc := &velerov1api.BackupStorageLocation{} - if err := c.kbClient.Get(context.Background(), client.ObjectKey{ - Namespace: ns, + if err := c.Get(ctx, client.ObjectKey{ + Namespace: req.Namespace, Name: backup.Spec.StorageLocation, }, loc); err != nil { if apierrors.IsNotFound(err) { @@ -159,53 +133,56 @@ func (c *gcController) processQueueItem(key string) error { } else { backup.Labels[garbageCollectionFailure] = gcFailureBSLCannotGet } - if err := c.kbClient.Update(context.Background(), backup); err != nil { + if err := c.Update(ctx, backup); err != nil { log.WithError(err).Error("error updating backup labels") } - return errors.Wrap(err, "error getting backup storage location") + return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location") } if loc.Spec.AccessMode == velerov1api.BackupStorageLocationAccessModeReadOnly { log.Infof("Backup cannot be garbage-collected because backup storage location %s is currently in read-only mode", loc.Name) backup.Labels[garbageCollectionFailure] = gcFailureBSLReadOnly - if err := c.kbClient.Update(context.Background(), backup); err != nil { + if err := c.Update(ctx, backup); err != nil { log.WithError(err).Error("error updating backup labels") } - return nil + return ctrl.Result{}, nil } // remove gc fail error label after this point delete(backup.Labels, garbageCollectionFailure) - if err := c.kbClient.Update(context.Background(), backup); err != nil { + if err := c.Update(ctx, backup); err != nil { log.WithError(err).Error("error updating backup labels") } - selector := labels.SelectorFromSet(labels.Set(map[string]string{ + selector := client.MatchingLabels{ velerov1api.BackupNameLabel: label.GetValidName(backup.Name), velerov1api.BackupUIDLabel: string(backup.UID), - })) - - dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).List(selector) - if err != nil { - return errors.Wrap(err, "error listing existing DeleteBackupRequests for backup") } + dbrs := &velerov1api.DeleteBackupRequestList{} + if err := c.List(ctx, dbrs, selector); err != nil { + log.WithError(err).Error("error listing DeleteBackupRequests") + return ctrl.Result{}, errors.Wrap(err, "error listing existing DeleteBackupRequests for backup") + } + log.Debugf("length of dbrs:%d", len(dbrs.Items)) + // if there's an existing unprocessed deletion request for this backup, don't create // another one - for _, dbr := range dbrs { + for _, dbr := range dbrs.Items { switch dbr.Status.Phase { case "", velerov1api.DeleteBackupRequestPhaseNew, velerov1api.DeleteBackupRequestPhaseInProgress: log.Info("Backup already has a pending deletion request") - return nil + return ctrl.Result{}, nil } } log.Info("Creating a new deletion request") - req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID)) - - if _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(context.TODO(), req, metav1.CreateOptions{}); err != nil { - return errors.Wrap(err, "error creating DeleteBackupRequest") + ndbr := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID)) + ndbr.SetNamespace(backup.Namespace) + if err := c.Create(ctx, ndbr); err != nil { + log.WithError(err).Error("error creating DeleteBackupRequests") + return ctrl.Result{}, errors.Wrap(err, "error creating DeleteBackupRequest") } - return nil + return ctrl.Result{}, nil } diff --git a/pkg/controller/gc_controller_test.go b/pkg/controller/gc_controller_test.go index c373064e4..8985f3be1 100644 --- a/pkg/controller/gc_controller_test.go +++ b/pkg/controller/gc_controller_test.go @@ -18,152 +18,41 @@ package controller import ( "context" - "fmt" - "sort" "testing" "time" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/watch" - core "k8s.io/client-go/testing" - + ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" - "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake" - informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions" velerotest "github.com/vmware-tanzu/velero/pkg/test" - "github.com/vmware-tanzu/velero/pkg/util/kube" ) -func TestGCControllerEnqueueAllBackups(t *testing.T) { - var ( - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - - controller = NewGCController( - velerotest.NewLogger(), - sharedInformers.Velero().V1().Backups(), - sharedInformers.Velero().V1().DeleteBackupRequests().Lister(), - client.VeleroV1(), - nil, - defaultGCFrequency, - ).(*gcController) - ) - - keys := make(chan string) - - controller.syncHandler = func(key string) error { - keys <- key - return nil - } - - var expected []string - - for i := 0; i < 3; i++ { - backup := builder.ForBackup(velerov1api.DefaultNamespace, fmt.Sprintf("backup-%d", i)).Result() - sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(backup) - expected = append(expected, kube.NamespaceAndName(backup)) - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - go controller.Run(ctx, 1) - - var received []string - -Loop: - for { - select { - case <-ctx.Done(): - t.Fatal("test timed out") - case key := <-keys: - received = append(received, key) - if len(received) == len(expected) { - break Loop - } - } - } - - sort.Strings(expected) - sort.Strings(received) - assert.Equal(t, expected, received) -} - -func TestGCControllerHasUpdateFunc(t *testing.T) { - backup := defaultBackup().Result() - expected := kube.NamespaceAndName(backup) - - client := fake.NewSimpleClientset(backup) - - fakeWatch := watch.NewFake() - defer fakeWatch.Stop() - client.PrependWatchReactor("backups", core.DefaultWatchReactor(fakeWatch, nil)) - - sharedInformers := informers.NewSharedInformerFactory(client, 0) - - controller := NewGCController( +func mockGCReconciler(fakeClient kbclient.Client, fakeClock *clock.FakeClock) *gcReconciler { + gcr := NewGCReconciler( velerotest.NewLogger(), - sharedInformers.Velero().V1().Backups(), - sharedInformers.Velero().V1().DeleteBackupRequests().Lister(), - client.VeleroV1(), - nil, - defaultGCFrequency, - ).(*gcController) - - keys := make(chan string) - - controller.syncHandler = func(key string) error { - keys <- key - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - go sharedInformers.Start(ctx.Done()) - go controller.Run(ctx, 1) - - // wait for the AddFunc - select { - case <-ctx.Done(): - t.Fatal("test timed out waiting for AddFunc") - case key := <-keys: - assert.Equal(t, expected, key) - } - - backup.Status.Version = 1234 - fakeWatch.Add(backup) - - // wait for the UpdateFunc - select { - case <-ctx.Done(): - t.Fatal("test timed out waiting for UpdateFunc") - case key := <-keys: - assert.Equal(t, expected, key) - } + fakeClient, + ) + gcr.clock = fakeClock + return gcr } -func TestGCControllerProcessQueueItem(t *testing.T) { - +func TestGCReconcile(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - defaultBackupLocation := builder.ForBackupStorageLocation("velero", "default").Result() + defaultBackupLocation := builder.ForBackupStorageLocation(velerov1api.DefaultNamespace, "default").Result() tests := []struct { - name string - backup *velerov1api.Backup - deleteBackupRequests []*velerov1api.DeleteBackupRequest - backupLocation *velerov1api.BackupStorageLocation - expectDeletion bool - createDeleteBackupRequestError bool - expectError bool + name string + backup *velerov1api.Backup + deleteBackupRequests []*velerov1api.DeleteBackupRequest + backupLocation *velerov1api.BackupStorageLocation + expectError bool }{ { name: "can't find backup - no error", @@ -172,25 +61,21 @@ func TestGCControllerProcessQueueItem(t *testing.T) { name: "unexpired backup is not deleted", backup: defaultBackup().Expiration(fakeClock.Now().Add(time.Minute)).StorageLocation("default").Result(), backupLocation: defaultBackupLocation, - expectDeletion: false, }, { name: "expired backup in read-only storage location is not deleted", backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Minute)).StorageLocation("read-only").Result(), backupLocation: builder.ForBackupStorageLocation("velero", "read-only").AccessMode(velerov1api.BackupStorageLocationAccessModeReadOnly).Result(), - expectDeletion: false, }, { name: "expired backup in read-write storage location is deleted", backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Minute)).StorageLocation("read-write").Result(), backupLocation: builder.ForBackupStorageLocation("velero", "read-write").AccessMode(velerov1api.BackupStorageLocationAccessModeReadWrite).Result(), - expectDeletion: true, }, { name: "expired backup with no pending deletion requests is deleted", backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Second)).StorageLocation("default").Result(), backupLocation: defaultBackupLocation, - expectDeletion: true, }, { name: "expired backup with a pending deletion request is not deleted", @@ -211,7 +96,6 @@ func TestGCControllerProcessQueueItem(t *testing.T) { }, }, }, - expectDeletion: false, }, { name: "expired backup with only processed deletion requests is deleted", @@ -232,72 +116,31 @@ func TestGCControllerProcessQueueItem(t *testing.T) { }, }, }, - expectDeletion: true, - }, - { - name: "create DeleteBackupRequest error returns an error", - backup: defaultBackup().Expiration(fakeClock.Now().Add(-time.Second)).StorageLocation("default").Result(), - backupLocation: defaultBackupLocation, - expectDeletion: true, - createDeleteBackupRequestError: true, - expectError: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - var ( - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - ) - - var fakeClient kbclient.Client - if test.backupLocation != nil { - fakeClient = velerotest.NewFakeControllerRuntimeClient(t, test.backupLocation) - } else { - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + if test.backup == nil { + return } - controller := NewGCController( - velerotest.NewLogger(), - sharedInformers.Velero().V1().Backups(), - sharedInformers.Velero().V1().DeleteBackupRequests().Lister(), - client.VeleroV1(), - fakeClient, - defaultGCFrequency, - ).(*gcController) - controller.clock = fakeClock + initObjs := []runtime.Object{} + initObjs = append(initObjs, test.backup) - var key string - if test.backup != nil { - key = kube.NamespaceAndName(test.backup) - sharedInformers.Velero().V1().Backups().Informer().GetStore().Add(test.backup) + if test.backupLocation != nil { + initObjs = append(initObjs, test.backupLocation) } for _, dbr := range test.deleteBackupRequests { - sharedInformers.Velero().V1().DeleteBackupRequests().Informer().GetStore().Add(dbr) + initObjs = append(initObjs, dbr) } - if test.createDeleteBackupRequestError { - client.PrependReactor("create", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { - return true, nil, errors.New("foo") - }) - } - - err := controller.processQueueItem(key) + fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...) + reconciler := mockGCReconciler(fakeClient, fakeClock) + _, err := reconciler.Reconcile(context.TODO(), ctrl.Request{NamespacedName: types.NamespacedName{Namespace: test.backup.Namespace, Name: test.backup.Name}}) gotErr := err != nil assert.Equal(t, test.expectError, gotErr) - - if test.expectDeletion { - require.Len(t, client.Actions(), 1) - - createAction, ok := client.Actions()[0].(core.CreateAction) - require.True(t, ok) - - assert.Equal(t, "deletebackuprequests", createAction.GetResource().Resource) - } else { - assert.Len(t, client.Actions(), 0) - } }) } }