From 53c3f4b436257cbd8d5f92781ab7901b423990fe Mon Sep 17 00:00:00 2001 From: lyndon <98304688+Lyndon-Li@users.noreply.github.com> Date: Tue, 7 Feb 2023 17:47:25 +0800 Subject: [PATCH] Issue fix 5226 (#5768) * fix issue 5226 Signed-off-by: Lyndon-Li --- changelogs/unreleased/5768-Lyndon-Li | 1 + .../backup_repository_controller.go | 128 ++++++++++++++-- .../backup_repository_controller_test.go | 144 +++++++++++++++++- pkg/util/kube/event_handler.go | 75 +++++++++ pkg/util/kube/pod.go | 2 +- pkg/util/kube/predicate.go | 19 +++ 6 files changed, 351 insertions(+), 18 deletions(-) create mode 100644 changelogs/unreleased/5768-Lyndon-Li create mode 100644 pkg/util/kube/event_handler.go diff --git a/changelogs/unreleased/5768-Lyndon-Li b/changelogs/unreleased/5768-Lyndon-Li new file mode 100644 index 000000000..e615bcac2 --- /dev/null +++ b/changelogs/unreleased/5768-Lyndon-Li @@ -0,0 +1 @@ +Fix issue 5226, invalidate the related backup repositories whenever the backup storage info change in BSL \ No newline at end of file diff --git a/pkg/controller/backup_repository_controller.go b/pkg/controller/backup_repository_controller.go index f151a1b68..3252cb1ba 100644 --- a/pkg/controller/backup_repository_controller.go +++ b/pkg/controller/backup_repository_controller.go @@ -17,18 +17,25 @@ limitations under the License. package controller import ( + "bytes" "context" + "reflect" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/clock" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/label" "github.com/vmware-tanzu/velero/pkg/repository" repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config" "github.com/vmware-tanzu/velero/pkg/util/kube" @@ -64,12 +71,88 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client func (r *BackupRepoReconciler) SetupWithManager(mgr ctrl.Manager) error { s := kube.NewPeriodicalEnqueueSource(r.logger, mgr.GetClient(), &velerov1api.BackupRepositoryList{}, repoSyncPeriod, kube.PeriodicalEnqueueSourceOption{}) + return ctrl.NewControllerManagedBy(mgr). For(&velerov1api.BackupRepository{}). Watches(s, nil). + Watches(&source.Kind{Type: &velerov1api.BackupStorageLocation{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.invalidateBackupReposForBSL), + builder.WithPredicates(kube.NewUpdateEventPredicate(r.needInvalidBackupRepo))). Complete(r) } +func (r *BackupRepoReconciler) invalidateBackupReposForBSL(bslObj client.Object) []reconcile.Request { + bsl := bslObj.(*velerov1api.BackupStorageLocation) + + list := &velerov1api.BackupRepositoryList{} + options := &client.ListOptions{ + LabelSelector: labels.Set(map[string]string{ + velerov1api.StorageLocationLabel: label.GetValidName(bsl.Name), + }).AsSelector(), + } + if err := r.List(context.TODO(), list, options); err != nil { + r.logger.WithField("BSL", bsl.Name).WithError(err).Error("unable to list BackupRepositorys") + return []reconcile.Request{} + } + + for i := range list.Items { + r.logger.WithField("BSL", bsl.Name).Infof("Invalidating Backup Repository %s", list.Items[i].Name) + r.patchBackupRepository(context.Background(), &list.Items[i], repoNotReady("re-establish on BSL change")) + } + + return []reconcile.Request{} +} + +func (r *BackupRepoReconciler) needInvalidBackupRepo(oldObj client.Object, newObj client.Object) bool { + oldBSL := oldObj.(*velerov1api.BackupStorageLocation) + newBSL := newObj.(*velerov1api.BackupStorageLocation) + + oldStorage := oldBSL.Spec.StorageType.ObjectStorage + newStorage := newBSL.Spec.StorageType.ObjectStorage + oldConfig := oldBSL.Spec.Config + newConfig := newBSL.Spec.Config + + if oldStorage == nil { + oldStorage = &velerov1api.ObjectStorageLocation{} + } + + if newStorage == nil { + newStorage = &velerov1api.ObjectStorageLocation{} + } + + logger := r.logger.WithField("BSL", newBSL.Name) + + if oldStorage.Bucket != newStorage.Bucket { + logger.WithFields(logrus.Fields{ + "old bucket": oldStorage.Bucket, + "new bucket": newStorage.Bucket, + }).Info("BSL's bucket has changed, invalid backup repositories") + + return true + } + + if oldStorage.Prefix != newStorage.Prefix { + logger.WithFields(logrus.Fields{ + "old prefix": oldStorage.Prefix, + "new prefix": newStorage.Prefix, + }).Info("BSL's prefix has changed, invalid backup repositories") + + return true + } + + if !bytes.Equal(oldStorage.CACert, newStorage.CACert) { + logger.Info("BSL's CACert has changed, invalid backup repositories") + return true + } + + if !reflect.DeepEqual(oldConfig, newConfig) { + logger.Info("BSL's storage config has changed, invalid backup repositories") + + return true + } + + return false +} + func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.logger.WithField("backupRepo", req.String()) backupRepo := &velerov1api.BackupRepository{} @@ -109,20 +192,29 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } +func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *velerov1api.BackupRepository) (string, error) { + loc := &velerov1api.BackupStorageLocation{} + + if err := r.Get(ctx, client.ObjectKey{ + Namespace: req.Namespace, + Name: req.Spec.BackupStorageLocation, + }, loc); err != nil { + return "", errors.Wrapf(err, "error to get BSL %s", req.Spec.BackupStorageLocation) + } + + repoIdentifier, err := repoconfig.GetRepoIdentifier(loc, req.Spec.VolumeNamespace) + if err != nil { + return "", errors.Wrapf(err, "error to get identifier for repo %s", req.Name) + } + + return repoIdentifier, nil +} + func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { log.Info("Initializing backup repository") // confirm the repo's BackupStorageLocation is valid - loc := &velerov1api.BackupStorageLocation{} - - if err := r.Get(context.Background(), client.ObjectKey{ - Namespace: req.Namespace, - Name: req.Spec.BackupStorageLocation, - }, loc); err != nil { - return r.patchBackupRepository(ctx, req, repoNotReady(err.Error())) - } - - repoIdentifier, err := repoconfig.GetRepoIdentifier(loc, req.Spec.VolumeNamespace) + repoIdentifier, err := r.getIdentiferByBSL(ctx, req) if err != nil { return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { rr.Status.Message = err.Error() @@ -210,12 +302,20 @@ func dueForMaintenance(req *velerov1api.BackupRepository, now time.Time) bool { } func (r *BackupRepoReconciler) checkNotReadyRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { - // no identifier: can't possibly be ready, so just return - if req.Spec.ResticIdentifier == "" { - return nil + log.Info("Checking backup repository for readiness") + + repoIdentifier, err := r.getIdentiferByBSL(ctx, req) + if err != nil { + return r.patchBackupRepository(ctx, req, repoNotReady(err.Error())) } - log.Info("Checking backup repository for readiness") + if repoIdentifier != req.Spec.ResticIdentifier { + if err := r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { + rr.Spec.ResticIdentifier = repoIdentifier + }); err != nil { + return err + } + } // we need to ensure it (first check, if check fails, attempt to init) // because we don't know if it's been successfully initialized yet. diff --git a/pkg/controller/backup_repository_controller_test.go b/pkg/controller/backup_repository_controller_test.go index 3288ee68d..382f40162 100644 --- a/pkg/controller/backup_repository_controller_test.go +++ b/pkg/controller/backup_repository_controller_test.go @@ -75,16 +75,28 @@ func TestPatchBackupRepository(t *testing.T) { func TestCheckNotReadyRepo(t *testing.T) { rr := mockBackupRepositoryCR() + rr.Spec.BackupStorageLocation = "default" + rr.Spec.ResticIdentifier = "fake-identifier" + rr.Spec.VolumeNamespace = "volume-ns-1" reconciler := mockBackupRepoReconciler(t, rr, "PrepareRepo", rr, nil) err := reconciler.Client.Create(context.TODO(), rr) assert.NoError(t, err) - err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger) + locations := &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + Config: map[string]string{"resticRepoPrefix": "s3:test.amazonaws.com/bucket/restic"}, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: rr.Spec.BackupStorageLocation, + }, + } + + err = reconciler.Client.Create(context.TODO(), locations) assert.NoError(t, err) - assert.Equal(t, rr.Status.Phase, velerov1api.BackupRepositoryPhase("")) - rr.Spec.ResticIdentifier = "s3:test.amazonaws.com/bucket/restic" err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger) assert.NoError(t, err) assert.Equal(t, rr.Status.Phase, velerov1api.BackupRepositoryPhaseReady) + assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier) } func TestRunMaintenanceIfDue(t *testing.T) { @@ -240,3 +252,129 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) { }) } } + +func TestNeedInvalidBackupRepo(t *testing.T) { + tests := []struct { + name string + oldBSL *velerov1api.BackupStorageLocation + newBSL *velerov1api.BackupStorageLocation + expect bool + }{ + { + name: "no change", + oldBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + Provider: "old-provider", + }, + }, + newBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + Provider: "new-provider", + }, + }, + expect: false, + }, + { + name: "other part change", + oldBSL: &velerov1api.BackupStorageLocation{}, + newBSL: &velerov1api.BackupStorageLocation{}, + expect: false, + }, + { + name: "bucket change", + oldBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + Bucket: "old-bucket", + }, + }, + }, + }, + newBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + Bucket: "new-bucket", + }, + }, + }, + }, + expect: true, + }, + { + name: "prefix change", + oldBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + Prefix: "old-prefix", + }, + }, + }, + }, + newBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + Prefix: "new-prefix", + }, + }, + }, + }, + expect: true, + }, + { + name: "CACert change", + oldBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + CACert: []byte{0x11, 0x12, 0x13}, + }, + }, + }, + }, + newBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + StorageType: velerov1api.StorageType{ + ObjectStorage: &velerov1api.ObjectStorageLocation{ + CACert: []byte{0x21, 0x22, 0x23}, + }, + }, + }, + }, + expect: true, + }, + { + name: "config change", + oldBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + Config: map[string]string{ + "key1": "value1", + }, + }, + }, + newBSL: &velerov1api.BackupStorageLocation{ + Spec: velerov1api.BackupStorageLocationSpec{ + Config: map[string]string{ + "key2": "value2", + }, + }, + }, + expect: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reconciler := NewBackupRepoReconciler( + velerov1api.DefaultNamespace, + velerotest.NewLogger(), + velerotest.NewFakeControllerRuntimeClient(t), + time.Duration(0), nil) + + need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL) + assert.Equal(t, test.expect, need) + }) + } +} diff --git a/pkg/util/kube/event_handler.go b/pkg/util/kube/event_handler.go new file mode 100644 index 000000000..471a41c81 --- /dev/null +++ b/pkg/util/kube/event_handler.go @@ -0,0 +1,75 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type MapUpdateFunc func(client.Object) []reconcile.Request + +// EnqueueRequestsFromMapUpdateFunc is for the same purpose with EnqueueRequestsFromMapFunc. +// Merely, it is more friendly to updating the mapped objects in the MapUpdateFunc, because +// on Update event, MapUpdateFunc is called for only once with the new object, so if MapUpdateFunc +// does some update to the mapped objects, the update is done for once +func EnqueueRequestsFromMapUpdateFunc(fn MapUpdateFunc) handler.EventHandler { + return &enqueueRequestsFromMapFunc{ + toRequests: fn, + } +} + +var _ handler.EventHandler = &enqueueRequestsFromMapFunc{} + +type enqueueRequestsFromMapFunc struct { + toRequests MapUpdateFunc +} + +// Create implements EventHandler. +func (e *enqueueRequestsFromMapFunc) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + e.mapAndEnqueue(q, evt.Object) +} + +// Update implements EventHandler. +func (e *enqueueRequestsFromMapFunc) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + e.mapAndEnqueue(q, evt.ObjectNew) +} + +// Delete implements EventHandler. +func (e *enqueueRequestsFromMapFunc) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + e.mapAndEnqueue(q, evt.Object) +} + +// Generic implements EventHandler. +func (e *enqueueRequestsFromMapFunc) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { + e.mapAndEnqueue(q, evt.Object) +} + +func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object) { + reqs := map[reconcile.Request]struct{}{} + + for _, req := range e.toRequests(object) { + _, ok := reqs[req] + if !ok { + q.Add(req) + reqs[req] = struct{}{} + } + } +} diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index 5d0b0f7f0..85fb41325 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -37,7 +37,7 @@ func IsPodRunning(pod *corev1api.Pod) error { func IsPodScheduled(pod *corev1api.Pod) error { return isPodScheduledInStatus(pod, func(pod *corev1api.Pod) error { if pod.Status.Phase != corev1api.PodRunning && pod.Status.Phase != corev1api.PodPending { - return errors.New("pod is running or pending") + return errors.New("pod is not running or pending") } return nil diff --git a/pkg/util/kube/predicate.go b/pkg/util/kube/predicate.go index a9660b34e..7bda38868 100644 --- a/pkg/util/kube/predicate.go +++ b/pkg/util/kube/predicate.go @@ -74,6 +74,25 @@ func NewAllEventPredicate(f func(object client.Object) bool) predicate.Predicate } } +// NewUpdateEventPredicate creates a new Predicate that checks the update events with the provided func +// and ignore others +func NewUpdateEventPredicate(f func(client.Object, client.Object) bool) predicate.Predicate { + return predicate.Funcs{ + UpdateFunc: func(event event.UpdateEvent) bool { + return f(event.ObjectOld, event.ObjectNew) + }, + CreateFunc: func(event event.CreateEvent) bool { + return false + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return false + }, + GenericFunc: func(event event.GenericEvent) bool { + return false + }, + } +} + // FalsePredicate always returns false for all kinds of events type FalsePredicate struct{}