From 8496b43e37fc0e97eac03ef61c2b162c4a78f7da Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 7 Sep 2022 17:57:52 +0800 Subject: [PATCH] refactor repo ensurer Signed-off-by: Lyndon-Li --- changelogs/unreleased/5308-lyndon | 1 + pkg/cmd/server/server.go | 2 +- pkg/repository/backup_repo_op.go | 34 ++++- pkg/repository/backup_repo_op_test.go | 152 ++++++++++++++++++++ pkg/repository/ensurer.go | 192 +++++++------------------- 5 files changed, 237 insertions(+), 144 deletions(-) create mode 100644 changelogs/unreleased/5308-lyndon create mode 100644 pkg/repository/backup_repo_op_test.go diff --git a/changelogs/unreleased/5308-lyndon b/changelogs/unreleased/5308-lyndon new file mode 100644 index 000000000..5e9c4a544 --- /dev/null +++ b/changelogs/unreleased/5308-lyndon @@ -0,0 +1 @@ +Refactor the repoEnsurer code to use controller runtime client and wrap some common BackupRepository operations to share with other modules \ No newline at end of file diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 2253d9dbc..932b2fa7d 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -547,7 +547,7 @@ func (s *server) initRestic() error { } s.repoLocker = repository.NewRepoLocker() - s.repoEnsurer = repository.NewRepositoryEnsurer(s.sharedInformerFactory.Velero().V1().BackupRepositories(), s.veleroClient.VeleroV1(), s.logger) + s.repoEnsurer = repository.NewRepositoryEnsurer(s.mgr.GetClient(), s.logger) s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger) diff --git a/pkg/repository/backup_repo_op.go b/pkg/repository/backup_repo_op.go index b60feb4fa..237d8ddaf 100644 --- a/pkg/repository/backup_repo_op.go +++ b/pkg/repository/backup_repo_op.go @@ -18,8 +18,10 @@ package repository import ( "context" + "fmt" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,7 +37,8 @@ type BackupRepositoryKey struct { } var ( - backupRepoNotFoundError = errors.New("backup repository not found") + backupRepoNotFoundError = errors.New("backup repository not found") + backupRepoNotProvisionedError = errors.New("backup repository not provisioned") ) func repoLabelsFromKey(key BackupRepositoryKey) labels.Set { @@ -76,10 +79,37 @@ func GetBackupRepository(ctx context.Context, cli client.Client, namespace strin repo := &backupRepoList.Items[0] if ensureReady { - if repo.Status.Phase != velerov1api.BackupRepositoryPhaseReady { + if repo.Status.Phase == velerov1api.BackupRepositoryPhaseNotReady { return nil, errors.Errorf("backup repository is not ready: %s", repo.Status.Message) } + + if repo.Status.Phase == velerov1api.BackupRepositoryPhaseNew { + return nil, backupRepoNotProvisionedError + } } return repo, nil } + +func newBackupRepository(namespace string, key BackupRepositoryKey) *velerov1api.BackupRepository { + return &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + GenerateName: fmt.Sprintf("%s-%s-%s-", key.VolumeNamespace, key.BackupLocation, key.RepositoryType), + Labels: repoLabelsFromKey(key), + }, + Spec: velerov1api.BackupRepositorySpec{ + VolumeNamespace: key.VolumeNamespace, + BackupStorageLocation: key.BackupLocation, + RepositoryType: key.RepositoryType, + }, + } +} + +func isBackupRepositoryNotFoundError(err error) bool { + return (err == backupRepoNotFoundError) +} + +func isBackupRepositoryNotProvisionedError(err error) bool { + return (err == backupRepoNotProvisionedError) +} diff --git a/pkg/repository/backup_repo_op_test.go b/pkg/repository/backup_repo_op_test.go new file mode 100644 index 000000000..104c4d616 --- /dev/null +++ b/pkg/repository/backup_repo_op_test.go @@ -0,0 +1,152 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package repository + +import ( + "context" + "fmt" + + "github.com/stretchr/testify/assert" + + "testing" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + + velerotest "github.com/vmware-tanzu/velero/pkg/test" +) + +func buildBackupRepo(key BackupRepositoryKey, phase velerov1api.BackupRepositoryPhase, seqNum string) velerov1api.BackupRepository { + return velerov1api.BackupRepository{ + Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""}, + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1api.SchemeGroupVersion.String(), + Kind: "BackupRepository", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: fmt.Sprintf("%s-%s-%s-%s", key.VolumeNamespace, key.BackupLocation, key.RepositoryType, seqNum), + Labels: map[string]string{ + velerov1api.StorageLocationLabel: key.BackupLocation, + velerov1api.VolumeNamespaceLabel: key.VolumeNamespace, + velerov1api.RepositoryTypeLabel: key.RepositoryType, + }, + }, + Status: velerov1api.BackupRepositoryStatus{ + Phase: phase, + }, + } +} + +func buildBackupRepoPointer(key BackupRepositoryKey, phase velerov1api.BackupRepositoryPhase, seqNum string) *velerov1api.BackupRepository { + value := buildBackupRepo(key, phase, seqNum) + return &value +} + +func TestGetBackupRepository(t *testing.T) { + testCases := []struct { + name string + backupRepositories []velerov1api.BackupRepository + ensureReady bool + backupRepositoryKey BackupRepositoryKey + expected *velerov1api.BackupRepository + expectedErr string + }{ + { + name: "repository not found", + expectedErr: "backup repository not found", + }, + { + name: "found more than one repository", + backupRepositories: []velerov1api.BackupRepository{ + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns", "fake-bsl", "fake-repository-type"}, velerov1api.BackupRepositoryPhaseReady, "01"), + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns", "fake-bsl", "fake-repository-type"}, velerov1api.BackupRepositoryPhaseReady, "02")}, + backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns", "fake-bsl", "fake-repository-type"}, + expectedErr: "more than one BackupRepository found for workload namespace \"fake-volume-ns\", backup storage location \"fake-bsl\", repository type \"fake-repository-type\"", + }, + { + name: "repository not ready, not expect ready", + backupRepositories: []velerov1api.BackupRepository{ + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"), + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNotReady, "02")}, + backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, + expected: buildBackupRepoPointer(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNotReady, "02"), + }, + { + name: "repository is new, not expect ready", + backupRepositories: []velerov1api.BackupRepository{ + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"), + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNew, "02")}, + backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, + expected: buildBackupRepoPointer(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNew, "02"), + }, + { + name: "repository not ready, expect ready", + backupRepositories: []velerov1api.BackupRepository{ + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"), + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNotReady, "02")}, + backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, + ensureReady: true, + expectedErr: "backup repository is not ready: ", + }, + { + name: "repository is new, expect ready", + backupRepositories: []velerov1api.BackupRepository{ + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"), + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNew, "02")}, + backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, + ensureReady: true, + expectedErr: "backup repository not provisioned", + }, + { + name: "repository ready, expect ready", + backupRepositories: []velerov1api.BackupRepository{ + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseNotReady, "01"), + buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseReady, "02")}, + backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, + ensureReady: true, + expected: buildBackupRepoPointer(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseReady, "02"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + clientBuilder := velerotest.NewFakeControllerRuntimeClientBuilder(t) + clientBuilder.WithLists(&velerov1api.BackupRepositoryList{ + Items: tc.backupRepositories, + }) + fakeClient := clientBuilder.Build() + + backupRepo, err := GetBackupRepository(context.Background(), fakeClient, velerov1api.DefaultNamespace, tc.backupRepositoryKey, tc.ensureReady) + + if backupRepo != nil { + backupRepo.ResourceVersion = tc.expected.ResourceVersion + require.Equal(t, *tc.expected, *backupRepo) + } else { + require.Equal(t, tc.expected, backupRepo) + } + + if tc.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.expectedErr) + } + }) + } +} diff --git a/pkg/repository/ensurer.go b/pkg/repository/ensurer.go index 7a7e48d9a..5527ac742 100644 --- a/pkg/repository/ensurer.go +++ b/pkg/repository/ensurer.go @@ -18,92 +18,34 @@ package repository import ( "context" - "fmt" "sync" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/tools/cache" + "k8s.io/apimachinery/pkg/util/wait" + + "sigs.k8s.io/controller-runtime/pkg/client" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1" - velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1" - velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1" - "github.com/vmware-tanzu/velero/pkg/label" ) // RepositoryEnsurer ensures that backup repositories are created and ready. type RepositoryEnsurer struct { log logrus.FieldLogger - repoLister velerov1listers.BackupRepositoryLister - repoClient velerov1client.BackupRepositoriesGetter - - repoChansLock sync.Mutex - repoChans map[string]chan *velerov1api.BackupRepository + repoClient client.Client // repoLocksMu synchronizes reads/writes to the repoLocks map itself // since maps are not threadsafe. repoLocksMu sync.Mutex - repoLocks map[repoKey]*sync.Mutex + repoLocks map[BackupRepositoryKey]*sync.Mutex } -type repoKey struct { - volumeNamespace string - backupLocation string - repositoryType string -} - -func NewRepositoryEnsurer(repoInformer velerov1informers.BackupRepositoryInformer, repoClient velerov1client.BackupRepositoriesGetter, log logrus.FieldLogger) *RepositoryEnsurer { - r := &RepositoryEnsurer{ +func NewRepositoryEnsurer(repoClient client.Client, log logrus.FieldLogger) *RepositoryEnsurer { + return &RepositoryEnsurer{ log: log, - repoLister: repoInformer.Lister(), repoClient: repoClient, - repoChans: make(map[string]chan *velerov1api.BackupRepository), - repoLocks: make(map[repoKey]*sync.Mutex), - } - - repoInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(old, upd interface{}) { - oldObj := old.(*velerov1api.BackupRepository) - newObj := upd.(*velerov1api.BackupRepository) - - // we're only interested in phase-changing updates - if oldObj.Status.Phase == newObj.Status.Phase { - return - } - - // we're only interested in updates where the updated object is either Ready or NotReady - if newObj.Status.Phase != velerov1api.BackupRepositoryPhaseReady && newObj.Status.Phase != velerov1api.BackupRepositoryPhaseNotReady { - return - } - - r.repoChansLock.Lock() - defer r.repoChansLock.Unlock() - - key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation, newObj.Spec.RepositoryType).String() - repoChan, ok := r.repoChans[key] - if !ok { - log.Debugf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name) - return - } - - repoChan <- newObj - }, - }, - ) - - return r -} - -func repoLabels(volumeNamespace, backupLocation, repositoryType string) labels.Set { - return map[string]string{ - velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace), - velerov1api.StorageLocationLabel: label.GetValidName(backupLocation), - velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType), + repoLocks: make(map[BackupRepositoryKey]*sync.Mutex), } } @@ -112,112 +54,80 @@ func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam return nil, errors.Errorf("wrong parameters, namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType) } + backupRepoKey := BackupRepositoryKey{volumeNamespace, backupLocation, repositoryType} + log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation).WithField("repositoryType", repositoryType) // It's only safe to have one instance of this method executing concurrently for a - // given volumeNamespace + backupLocation + repositoryType, so synchronize based on that. It's fine - // to run concurrently for *different* namespaces/locations. If you had 2 goroutines + // given BackupRepositoryKey, so synchronize based on that. It's fine + // to run concurrently for *different* BackupRepositoryKey. If you had 2 goroutines // running this for the same inputs, both might find no BackupRepository exists, then - // both would create new ones for the same namespace/location. + // both would create new ones for the same BackupRepositoryKey. // // This issue could probably be avoided if we had a deterministic name for - // each restic repository, and we just tried to create it, checked for an + // each BackupRepository and we just tried to create it, checked for an // AlreadyExists err, and then waited for it to be ready. However, there are // already repositories in the wild with non-deterministic names (i.e. using // GenerateName) which poses a backwards compatibility problem. log.Debug("Acquiring lock") - repoMu := r.repoLock(volumeNamespace, backupLocation, repositoryType) + repoMu := r.repoLock(backupRepoKey) repoMu.Lock() defer func() { repoMu.Unlock() log.Debug("Released lock") }() - log.Debug("Acquired lock") - - selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation, repositoryType)) - - repos, err := r.repoLister.BackupRepositories(namespace).List(selector) - if err != nil { - return nil, errors.WithStack(err) - } - if len(repos) > 1 { - return nil, errors.Errorf("more than one BackupRepository found for workload namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType) - } - if len(repos) == 1 { - if repos[0].Status.Phase != velerov1api.BackupRepositoryPhaseReady { - return nil, errors.Errorf("restic repository is not ready: %s", repos[0].Status.Message) - } - + repo, err := GetBackupRepository(ctx, r.repoClient, namespace, backupRepoKey, true) + if err == nil { log.Debug("Ready repository found") - return repos[0], nil + return repo, nil + } + + if !isBackupRepositoryNotFoundError(err) { + return nil, errors.WithStack(err) } log.Debug("No repository found, creating one") // no repo found: create one and wait for it to be ready - repo := &velerov1api.BackupRepository{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - GenerateName: fmt.Sprintf("%s-%s-%s-", volumeNamespace, backupLocation, repositoryType), - Labels: repoLabels(volumeNamespace, backupLocation, repositoryType), - }, - Spec: velerov1api.BackupRepositorySpec{ - VolumeNamespace: volumeNamespace, - BackupStorageLocation: backupLocation, - RepositoryType: repositoryType, - }, - } - - repoChan := r.getRepoChan(selector.String()) - defer func() { - delete(r.repoChans, selector.String()) - close(repoChan) - }() - - if _, err := r.repoClient.BackupRepositories(namespace).Create(context.TODO(), repo, metav1.CreateOptions{}); err != nil { - return nil, errors.Wrapf(err, "unable to create restic repository resource") - } - - select { - // repositories should become either ready or not ready quickly if they're - // newly created. - case <-time.After(time.Minute): - return nil, errors.New("timed out waiting for restic repository to become ready") - case <-ctx.Done(): - return nil, errors.New("timed out waiting for restic repository to become ready") - case res := <-repoChan: - - if res.Status.Phase == velerov1api.BackupRepositoryPhaseNotReady { - return nil, errors.Errorf("restic repository is not ready: %s", res.Status.Message) - } - - return res, nil - } + return r.createBackupRepositoryAndWait(ctx, namespace, backupRepoKey) } -func (r *RepositoryEnsurer) getRepoChan(name string) chan *velerov1api.BackupRepository { - r.repoChansLock.Lock() - defer r.repoChansLock.Unlock() - - r.repoChans[name] = make(chan *velerov1api.BackupRepository) - return r.repoChans[name] -} - -func (r *RepositoryEnsurer) repoLock(volumeNamespace, backupLocation, repositoryType string) *sync.Mutex { +func (r *RepositoryEnsurer) repoLock(key BackupRepositoryKey) *sync.Mutex { r.repoLocksMu.Lock() defer r.repoLocksMu.Unlock() - key := repoKey{ - volumeNamespace: volumeNamespace, - backupLocation: backupLocation, - repositoryType: repositoryType, - } - if r.repoLocks[key] == nil { r.repoLocks[key] = new(sync.Mutex) } return r.repoLocks[key] } + +func (r *RepositoryEnsurer) createBackupRepositoryAndWait(ctx context.Context, namespace string, backupRepoKey BackupRepositoryKey) (*velerov1api.BackupRepository, error) { + toCreate := newBackupRepository(namespace, backupRepoKey) + if err := r.repoClient.Create(ctx, toCreate, &client.CreateOptions{}); err != nil { + return nil, errors.Wrap(err, "unable to create backup repository resource") + } + + var repo *velerov1api.BackupRepository + checkFunc := func(ctx context.Context) (bool, error) { + found, err := GetBackupRepository(ctx, r.repoClient, namespace, backupRepoKey, true) + if err == nil { + repo = found + return true, nil + } else if isBackupRepositoryNotFoundError(err) || isBackupRepositoryNotProvisionedError(err) { + return false, nil + } else { + return false, err + } + } + + err := wait.PollWithContext(ctx, time.Millisecond*500, time.Minute, checkFunc) + if err != nil { + return nil, errors.Wrap(err, "failed to wait BackupRepository") + } else { + return repo, nil + } +}