refactor repo ensurer

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2022-09-07 17:57:52 +08:00
parent 8888f8765e
commit 8496b43e37
5 changed files with 237 additions and 144 deletions

View File

@@ -0,0 +1 @@
Refactor the repoEnsurer code to use controller runtime client and wrap some common BackupRepository operations to share with other modules

View File

@@ -547,7 +547,7 @@ func (s *server) initRestic() error {
}
s.repoLocker = repository.NewRepoLocker()
s.repoEnsurer = repository.NewRepositoryEnsurer(s.sharedInformerFactory.Velero().V1().BackupRepositories(), s.veleroClient.VeleroV1(), s.logger)
s.repoEnsurer = repository.NewRepositoryEnsurer(s.mgr.GetClient(), s.logger)
s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger)

View File

@@ -18,8 +18,10 @@ package repository
import (
"context"
"fmt"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -35,7 +37,8 @@ type BackupRepositoryKey struct {
}
var (
backupRepoNotFoundError = errors.New("backup repository not found")
backupRepoNotFoundError = errors.New("backup repository not found")
backupRepoNotProvisionedError = errors.New("backup repository not provisioned")
)
func repoLabelsFromKey(key BackupRepositoryKey) labels.Set {
@@ -76,10 +79,37 @@ func GetBackupRepository(ctx context.Context, cli client.Client, namespace strin
repo := &backupRepoList.Items[0]
if ensureReady {
if repo.Status.Phase != velerov1api.BackupRepositoryPhaseReady {
if repo.Status.Phase == velerov1api.BackupRepositoryPhaseNotReady {
return nil, errors.Errorf("backup repository is not ready: %s", repo.Status.Message)
}
if repo.Status.Phase == velerov1api.BackupRepositoryPhaseNew {
return nil, backupRepoNotProvisionedError
}
}
return repo, nil
}
func newBackupRepository(namespace string, key BackupRepositoryKey) *velerov1api.BackupRepository {
return &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
GenerateName: fmt.Sprintf("%s-%s-%s-", key.VolumeNamespace, key.BackupLocation, key.RepositoryType),
Labels: repoLabelsFromKey(key),
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: key.VolumeNamespace,
BackupStorageLocation: key.BackupLocation,
RepositoryType: key.RepositoryType,
},
}
}
func isBackupRepositoryNotFoundError(err error) bool {
return (err == backupRepoNotFoundError)
}
func isBackupRepositoryNotProvisionedError(err error) bool {
return (err == backupRepoNotProvisionedError)
}

View File

@@ -0,0 +1,152 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repository
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func buildBackupRepo(key BackupRepositoryKey, phase velerov1api.BackupRepositoryPhase, seqNum string) velerov1api.BackupRepository {
return velerov1api.BackupRepository{
Spec: velerov1api.BackupRepositorySpec{ResticIdentifier: ""},
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1api.SchemeGroupVersion.String(),
Kind: "BackupRepository",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: fmt.Sprintf("%s-%s-%s-%s", key.VolumeNamespace, key.BackupLocation, key.RepositoryType, seqNum),
Labels: map[string]string{
velerov1api.StorageLocationLabel: key.BackupLocation,
velerov1api.VolumeNamespaceLabel: key.VolumeNamespace,
velerov1api.RepositoryTypeLabel: key.RepositoryType,
},
},
Status: velerov1api.BackupRepositoryStatus{
Phase: phase,
},
}
}
func buildBackupRepoPointer(key BackupRepositoryKey, phase velerov1api.BackupRepositoryPhase, seqNum string) *velerov1api.BackupRepository {
value := buildBackupRepo(key, phase, seqNum)
return &value
}
func TestGetBackupRepository(t *testing.T) {
testCases := []struct {
name string
backupRepositories []velerov1api.BackupRepository
ensureReady bool
backupRepositoryKey BackupRepositoryKey
expected *velerov1api.BackupRepository
expectedErr string
}{
{
name: "repository not found",
expectedErr: "backup repository not found",
},
{
name: "found more than one repository",
backupRepositories: []velerov1api.BackupRepository{
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns", "fake-bsl", "fake-repository-type"}, velerov1api.BackupRepositoryPhaseReady, "01"),
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns", "fake-bsl", "fake-repository-type"}, velerov1api.BackupRepositoryPhaseReady, "02")},
backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns", "fake-bsl", "fake-repository-type"},
expectedErr: "more than one BackupRepository found for workload namespace \"fake-volume-ns\", backup storage location \"fake-bsl\", repository type \"fake-repository-type\"",
},
{
name: "repository not ready, not expect ready",
backupRepositories: []velerov1api.BackupRepository{
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"),
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNotReady, "02")},
backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"},
expected: buildBackupRepoPointer(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNotReady, "02"),
},
{
name: "repository is new, not expect ready",
backupRepositories: []velerov1api.BackupRepository{
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"),
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNew, "02")},
backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"},
expected: buildBackupRepoPointer(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNew, "02"),
},
{
name: "repository not ready, expect ready",
backupRepositories: []velerov1api.BackupRepository{
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"),
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNotReady, "02")},
backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"},
ensureReady: true,
expectedErr: "backup repository is not ready: ",
},
{
name: "repository is new, expect ready",
backupRepositories: []velerov1api.BackupRepository{
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseReady, "01"),
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseNew, "02")},
backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"},
ensureReady: true,
expectedErr: "backup repository not provisioned",
},
{
name: "repository ready, expect ready",
backupRepositories: []velerov1api.BackupRepository{
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-01", "fake-bsl-01", "fake-repository-type-01"}, velerov1api.BackupRepositoryPhaseNotReady, "01"),
buildBackupRepo(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseReady, "02")},
backupRepositoryKey: BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"},
ensureReady: true,
expected: buildBackupRepoPointer(BackupRepositoryKey{"fake-volume-ns-02", "fake-bsl-02", "fake-repository-type-02"}, velerov1api.BackupRepositoryPhaseReady, "02"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
clientBuilder := velerotest.NewFakeControllerRuntimeClientBuilder(t)
clientBuilder.WithLists(&velerov1api.BackupRepositoryList{
Items: tc.backupRepositories,
})
fakeClient := clientBuilder.Build()
backupRepo, err := GetBackupRepository(context.Background(), fakeClient, velerov1api.DefaultNamespace, tc.backupRepositoryKey, tc.ensureReady)
if backupRepo != nil {
backupRepo.ResourceVersion = tc.expected.ResourceVersion
require.Equal(t, *tc.expected, *backupRepo)
} else {
require.Equal(t, tc.expected, backupRepo)
}
if tc.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.expectedErr)
}
})
}
}

View File

@@ -18,92 +18,34 @@ package repository
import (
"context"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
)
// RepositoryEnsurer ensures that backup repositories are created and ready.
type RepositoryEnsurer struct {
log logrus.FieldLogger
repoLister velerov1listers.BackupRepositoryLister
repoClient velerov1client.BackupRepositoriesGetter
repoChansLock sync.Mutex
repoChans map[string]chan *velerov1api.BackupRepository
repoClient client.Client
// repoLocksMu synchronizes reads/writes to the repoLocks map itself
// since maps are not threadsafe.
repoLocksMu sync.Mutex
repoLocks map[repoKey]*sync.Mutex
repoLocks map[BackupRepositoryKey]*sync.Mutex
}
type repoKey struct {
volumeNamespace string
backupLocation string
repositoryType string
}
func NewRepositoryEnsurer(repoInformer velerov1informers.BackupRepositoryInformer, repoClient velerov1client.BackupRepositoriesGetter, log logrus.FieldLogger) *RepositoryEnsurer {
r := &RepositoryEnsurer{
func NewRepositoryEnsurer(repoClient client.Client, log logrus.FieldLogger) *RepositoryEnsurer {
return &RepositoryEnsurer{
log: log,
repoLister: repoInformer.Lister(),
repoClient: repoClient,
repoChans: make(map[string]chan *velerov1api.BackupRepository),
repoLocks: make(map[repoKey]*sync.Mutex),
}
repoInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, upd interface{}) {
oldObj := old.(*velerov1api.BackupRepository)
newObj := upd.(*velerov1api.BackupRepository)
// we're only interested in phase-changing updates
if oldObj.Status.Phase == newObj.Status.Phase {
return
}
// we're only interested in updates where the updated object is either Ready or NotReady
if newObj.Status.Phase != velerov1api.BackupRepositoryPhaseReady && newObj.Status.Phase != velerov1api.BackupRepositoryPhaseNotReady {
return
}
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation, newObj.Spec.RepositoryType).String()
repoChan, ok := r.repoChans[key]
if !ok {
log.Debugf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
return
}
repoChan <- newObj
},
},
)
return r
}
func repoLabels(volumeNamespace, backupLocation, repositoryType string) labels.Set {
return map[string]string{
velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(backupLocation),
velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
repoLocks: make(map[BackupRepositoryKey]*sync.Mutex),
}
}
@@ -112,112 +54,80 @@ func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
return nil, errors.Errorf("wrong parameters, namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType)
}
backupRepoKey := BackupRepositoryKey{volumeNamespace, backupLocation, repositoryType}
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation).WithField("repositoryType", repositoryType)
// It's only safe to have one instance of this method executing concurrently for a
// given volumeNamespace + backupLocation + repositoryType, so synchronize based on that. It's fine
// to run concurrently for *different* namespaces/locations. If you had 2 goroutines
// given BackupRepositoryKey, so synchronize based on that. It's fine
// to run concurrently for *different* BackupRepositoryKey. If you had 2 goroutines
// running this for the same inputs, both might find no BackupRepository exists, then
// both would create new ones for the same namespace/location.
// both would create new ones for the same BackupRepositoryKey.
//
// This issue could probably be avoided if we had a deterministic name for
// each restic repository, and we just tried to create it, checked for an
// each BackupRepository and we just tried to create it, checked for an
// AlreadyExists err, and then waited for it to be ready. However, there are
// already repositories in the wild with non-deterministic names (i.e. using
// GenerateName) which poses a backwards compatibility problem.
log.Debug("Acquiring lock")
repoMu := r.repoLock(volumeNamespace, backupLocation, repositoryType)
repoMu := r.repoLock(backupRepoKey)
repoMu.Lock()
defer func() {
repoMu.Unlock()
log.Debug("Released lock")
}()
log.Debug("Acquired lock")
selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation, repositoryType))
repos, err := r.repoLister.BackupRepositories(namespace).List(selector)
if err != nil {
return nil, errors.WithStack(err)
}
if len(repos) > 1 {
return nil, errors.Errorf("more than one BackupRepository found for workload namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType)
}
if len(repos) == 1 {
if repos[0].Status.Phase != velerov1api.BackupRepositoryPhaseReady {
return nil, errors.Errorf("restic repository is not ready: %s", repos[0].Status.Message)
}
repo, err := GetBackupRepository(ctx, r.repoClient, namespace, backupRepoKey, true)
if err == nil {
log.Debug("Ready repository found")
return repos[0], nil
return repo, nil
}
if !isBackupRepositoryNotFoundError(err) {
return nil, errors.WithStack(err)
}
log.Debug("No repository found, creating one")
// no repo found: create one and wait for it to be ready
repo := &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
GenerateName: fmt.Sprintf("%s-%s-%s-", volumeNamespace, backupLocation, repositoryType),
Labels: repoLabels(volumeNamespace, backupLocation, repositoryType),
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: volumeNamespace,
BackupStorageLocation: backupLocation,
RepositoryType: repositoryType,
},
}
repoChan := r.getRepoChan(selector.String())
defer func() {
delete(r.repoChans, selector.String())
close(repoChan)
}()
if _, err := r.repoClient.BackupRepositories(namespace).Create(context.TODO(), repo, metav1.CreateOptions{}); err != nil {
return nil, errors.Wrapf(err, "unable to create restic repository resource")
}
select {
// repositories should become either ready or not ready quickly if they're
// newly created.
case <-time.After(time.Minute):
return nil, errors.New("timed out waiting for restic repository to become ready")
case <-ctx.Done():
return nil, errors.New("timed out waiting for restic repository to become ready")
case res := <-repoChan:
if res.Status.Phase == velerov1api.BackupRepositoryPhaseNotReady {
return nil, errors.Errorf("restic repository is not ready: %s", res.Status.Message)
}
return res, nil
}
return r.createBackupRepositoryAndWait(ctx, namespace, backupRepoKey)
}
func (r *RepositoryEnsurer) getRepoChan(name string) chan *velerov1api.BackupRepository {
r.repoChansLock.Lock()
defer r.repoChansLock.Unlock()
r.repoChans[name] = make(chan *velerov1api.BackupRepository)
return r.repoChans[name]
}
func (r *RepositoryEnsurer) repoLock(volumeNamespace, backupLocation, repositoryType string) *sync.Mutex {
func (r *RepositoryEnsurer) repoLock(key BackupRepositoryKey) *sync.Mutex {
r.repoLocksMu.Lock()
defer r.repoLocksMu.Unlock()
key := repoKey{
volumeNamespace: volumeNamespace,
backupLocation: backupLocation,
repositoryType: repositoryType,
}
if r.repoLocks[key] == nil {
r.repoLocks[key] = new(sync.Mutex)
}
return r.repoLocks[key]
}
func (r *RepositoryEnsurer) createBackupRepositoryAndWait(ctx context.Context, namespace string, backupRepoKey BackupRepositoryKey) (*velerov1api.BackupRepository, error) {
toCreate := newBackupRepository(namespace, backupRepoKey)
if err := r.repoClient.Create(ctx, toCreate, &client.CreateOptions{}); err != nil {
return nil, errors.Wrap(err, "unable to create backup repository resource")
}
var repo *velerov1api.BackupRepository
checkFunc := func(ctx context.Context) (bool, error) {
found, err := GetBackupRepository(ctx, r.repoClient, namespace, backupRepoKey, true)
if err == nil {
repo = found
return true, nil
} else if isBackupRepositoryNotFoundError(err) || isBackupRepositoryNotProvisionedError(err) {
return false, nil
} else {
return false, err
}
}
err := wait.PollWithContext(ctx, time.Millisecond*500, time.Minute, checkFunc)
if err != nil {
return nil, errors.Wrap(err, "failed to wait BackupRepository")
} else {
return repo, nil
}
}