From b3157190866b3c6bdc6e125c8ab03c069a5928f3 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Mon, 25 Feb 2019 13:06:22 -0800 Subject: [PATCH] pkg/restic: fix concurrency bugs causing dupe repos, panics Signed-off-by: Steve Kriss --- changelogs/unreleased/1235-skriss | 1 + pkg/restic/repository_ensurer.go | 60 +++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 changelogs/unreleased/1235-skriss diff --git a/changelogs/unreleased/1235-skriss b/changelogs/unreleased/1235-skriss new file mode 100644 index 000000000..a2c55f75c --- /dev/null +++ b/changelogs/unreleased/1235-skriss @@ -0,0 +1 @@ +Fix concurrency bug in code ensuring restic repository exists diff --git a/pkg/restic/repository_ensurer.go b/pkg/restic/repository_ensurer.go index 3810c7ec2..df5117e94 100644 --- a/pkg/restic/repository_ensurer.go +++ b/pkg/restic/repository_ensurer.go @@ -35,18 +35,31 @@ import ( // repositoryEnsurer ensures that Ark restic repositories are created and ready. type repositoryEnsurer struct { + log logrus.FieldLogger repoLister arkv1listers.ResticRepositoryLister repoClient arkv1client.ResticRepositoriesGetter readyChansLock sync.Mutex readyChans map[string]chan *arkv1api.ResticRepository + + // repoLocksMu synchronizes reads/writes to the repoLocks map itself + // since maps are not threadsafe. + repoLocksMu sync.Mutex + repoLocks map[repoKey]*sync.Mutex +} + +type repoKey struct { + volumeNamespace string + backupLocation string } func newRepositoryEnsurer(repoInformer arkv1informers.ResticRepositoryInformer, repoClient arkv1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer { r := &repositoryEnsurer{ + log: log, repoLister: repoInformer.Lister(), repoClient: repoClient, readyChans: make(map[string]chan *arkv1api.ResticRepository), + repoLocks: make(map[repoKey]*sync.Mutex), } repoInformer.Informer().AddEventHandler( @@ -67,7 +80,7 @@ func newRepositoryEnsurer(repoInformer arkv1informers.ResticRepositoryInformer, } readyChan <- newObj - delete(r.readyChans, newObj.Name) + delete(r.readyChans, key) } }, }, @@ -84,6 +97,30 @@ func repoLabels(volumeNamespace, backupLocation string) labels.Set { } func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*arkv1api.ResticRepository, error) { + log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation) + + // It's only safe to have one instance of this method executing concurrently for a + // given volumeNamespace + backupLocation, so synchronize based on that. It's fine + // to run concurrently for *different* namespaces/locations. If you had 2 goroutines + // running this for the same inputs, both might find no ResticRepository exists, then + // both would create new ones for the same namespace/location. + // + // This issue could probably be avoided if we had a deterministic name for + // each restic repository, and we just tried to create it, checked for an + // AlreadyExists err, and then waited for it to be ready. However, there are + // already repositories in the wild with non-deterministic names (i.e. using + // GenerateName) which poses a backwards compatibility problem. + log.Debug("Acquiring lock") + + repoMu := r.repoLock(volumeNamespace, backupLocation) + repoMu.Lock() + defer func() { + repoMu.Unlock() + log.Debug("Released lock") + }() + + log.Debug("Acquired lock") + selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation)) repos, err := r.repoLister.ResticRepositories(namespace).List(selector) @@ -97,11 +134,14 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam if repos[0].Status.Phase != arkv1api.ResticRepositoryPhaseReady { return nil, errors.New("restic repository is not ready") } + + log.Debug("Ready repository found") return repos[0], nil } - // no repo found: create one and wait for it to be ready + log.Debug("No repository found, creating one") + // no repo found: create one and wait for it to be ready repo := &arkv1api.ResticRepository{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -137,3 +177,19 @@ func (r *repositoryEnsurer) getReadyChan(name string) chan *arkv1api.ResticRepos r.readyChans[name] = make(chan *arkv1api.ResticRepository) return r.readyChans[name] } + +func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex { + r.repoLocksMu.Lock() + defer r.repoLocksMu.Unlock() + + key := repoKey{ + volumeNamespace: volumeNamespace, + backupLocation: backupLocation, + } + + if r.repoLocks[key] == nil { + r.repoLocks[key] = new(sync.Mutex) + } + + return r.repoLocks[key] +}