From 22e8f23e2c14cd1efc3ecae8e63f5670ba4272a1 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Fri, 22 Jun 2018 12:07:23 -0700 Subject: [PATCH] replace ark restic repo init cmd with automatic initialization of repos Signed-off-by: Steve Kriss --- docs/cli-reference/ark_restic_repo.md | 1 - docs/cli-reference/ark_restic_repo_init.md | 41 ----- pkg/cmd/cli/restic/repo/init.go | 165 ------------------ pkg/cmd/cli/restic/repo/init_test.go | 88 ---------- pkg/cmd/cli/restic/repo/repo.go | 1 - pkg/cmd/cli/restic/server.go | 26 ++- pkg/cmd/server/server.go | 30 ++-- pkg/controller/backup_deletion_controller.go | 36 +++- .../pod_volume_backup_controller.go | 8 +- .../pod_volume_restore_controller.go | 10 +- .../restic_repository_controller.go | 16 +- pkg/restic/backupper.go | 54 ++---- pkg/restic/command_factory.go | 16 ++ pkg/restic/common.go | 10 +- pkg/restic/exec_commands.go | 16 ++ pkg/restic/repository_ensurer.go | 122 +++++++++++++ pkg/restic/repository_keys.go | 41 ++++- pkg/restic/repository_manager.go | 84 ++++----- pkg/restic/restorer.go | 28 +-- 19 files changed, 360 insertions(+), 433 deletions(-) delete mode 100644 docs/cli-reference/ark_restic_repo_init.md delete mode 100644 pkg/cmd/cli/restic/repo/init.go delete mode 100644 pkg/cmd/cli/restic/repo/init_test.go create mode 100644 pkg/restic/repository_ensurer.go diff --git a/docs/cli-reference/ark_restic_repo.md b/docs/cli-reference/ark_restic_repo.md index 43285031b..ae41d6ddb 100644 --- a/docs/cli-reference/ark_restic_repo.md +++ b/docs/cli-reference/ark_restic_repo.md @@ -31,5 +31,4 @@ Work with restic repositories ### SEE ALSO * [ark restic](ark_restic.md) - Work with restic * [ark restic repo get](ark_restic_repo_get.md) - Get restic repositories -* [ark restic repo init](ark_restic_repo_init.md) - initialize a restic repository for a specified namespace diff --git a/docs/cli-reference/ark_restic_repo_init.md b/docs/cli-reference/ark_restic_repo_init.md deleted file mode 100644 index c0c171608..000000000 --- a/docs/cli-reference/ark_restic_repo_init.md +++ /dev/null @@ -1,41 +0,0 @@ -## ark restic repo init - -initialize a restic repository for a specified namespace - -### Synopsis - - -initialize a restic repository for a specified namespace - -``` -ark restic repo init NAMESPACE [flags] -``` - -### Options - -``` - -h, --help help for init - --key-data string Encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you. - --key-file string Path to file containing the encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you. - --key-size int Size of the generated key for the restic repository (default 1024) - --maintenance-frequency duration How often maintenance (i.e. restic prune & check) is run on the repository (default 24h0m0s) -``` - -### Options inherited from parent commands - -``` - --alsologtostderr log to standard error as well as files - --kubeconfig string Path to the kubeconfig file to use to talk to the Kubernetes apiserver. If unset, try the environment variable KUBECONFIG, as well as in-cluster configuration - --kubecontext string The context to use to talk to the Kubernetes apiserver. If unset defaults to whatever your current-context is (kubectl config current-context) - --log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0) - --log_dir string If non-empty, write log files in this directory - --logtostderr log to standard error instead of files - -n, --namespace string The namespace in which Ark should operate (default "heptio-ark") - --stderrthreshold severity logs at or above this threshold go to stderr (default 2) - -v, --v Level log level for V logs - --vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging -``` - -### SEE ALSO -* [ark restic repo](ark_restic_repo.md) - Work with restic repositories - diff --git a/pkg/cmd/cli/restic/repo/init.go b/pkg/cmd/cli/restic/repo/init.go deleted file mode 100644 index 7b3f88f80..000000000 --- a/pkg/cmd/cli/restic/repo/init.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2018 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package repo - -import ( - "crypto/rand" - "time" - - "github.com/pkg/errors" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kclientset "k8s.io/client-go/kubernetes" - - "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/client" - "github.com/heptio/ark/pkg/cmd" - clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" - "github.com/heptio/ark/pkg/restic" - "github.com/heptio/ark/pkg/util/filesystem" -) - -func NewInitCommand(f client.Factory) *cobra.Command { - o := NewInitRepositoryOptions() - - c := &cobra.Command{ - Use: "init NAMESPACE", - Short: "initialize a restic repository for a specified namespace", - Long: "initialize a restic repository for a specified namespace", - Args: cobra.ExactArgs(1), - Run: func(c *cobra.Command, args []string) { - cmd.CheckError(o.Complete(f, args)) - cmd.CheckError(o.Validate(f)) - cmd.CheckError(o.Run(f)) - }, - } - - o.BindFlags(c.Flags()) - - return c -} - -type InitRepositoryOptions struct { - Namespace string - KeyFile string - KeyData string - KeySize int - MaintenanceFrequency time.Duration - - fileSystem filesystem.Interface - kubeClient kclientset.Interface - arkClient clientset.Interface - keyBytes []byte -} - -func NewInitRepositoryOptions() *InitRepositoryOptions { - return &InitRepositoryOptions{ - KeySize: 1024, - MaintenanceFrequency: restic.DefaultMaintenanceFrequency, - fileSystem: filesystem.NewFileSystem(), - } -} - -var ( - errKeyFileAndKeyDataProvided = errors.Errorf("only one of --key-file and --key-data may be specified") - errKeySizeTooSmall = errors.Errorf("--key-size must be at least 1") -) - -func (o *InitRepositoryOptions) BindFlags(flags *pflag.FlagSet) { - flags.StringVar(&o.KeyFile, "key-file", o.KeyFile, "Path to file containing the encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you.") - flags.StringVar(&o.KeyData, "key-data", o.KeyData, "Encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you.") - flags.IntVar(&o.KeySize, "key-size", o.KeySize, "Size of the generated key for the restic repository") - flags.DurationVar(&o.MaintenanceFrequency, "maintenance-frequency", o.MaintenanceFrequency, "How often maintenance (i.e. restic prune & check) is run on the repository") -} - -func (o *InitRepositoryOptions) Complete(f client.Factory, args []string) error { - if o.KeyFile != "" && o.KeyData != "" { - return errKeyFileAndKeyDataProvided - } - - if o.KeyFile == "" && o.KeyData == "" && o.KeySize < 1 { - return errKeySizeTooSmall - } - - o.Namespace = args[0] - - switch { - case o.KeyFile != "": - data, err := o.fileSystem.ReadFile(o.KeyFile) - if err != nil { - return err - } - o.keyBytes = data - case o.KeyData != "": - o.keyBytes = []byte(o.KeyData) - case o.KeySize > 0: - o.keyBytes = make([]byte, o.KeySize) - // rand.Reader always returns a nil error - rand.Read(o.keyBytes) - } - - return nil -} - -func (o *InitRepositoryOptions) Validate(f client.Factory) error { - if len(o.keyBytes) == 0 { - return errors.Errorf("keyBytes is required") - } - - if o.MaintenanceFrequency <= 0 { - return errors.Errorf("--maintenance-frequency must be greater than zero") - } - - kubeClient, err := f.KubeClient() - if err != nil { - return err - } - o.kubeClient = kubeClient - - if _, err := kubeClient.CoreV1().Namespaces().Get(o.Namespace, metav1.GetOptions{}); err != nil { - return err - } - - arkClient, err := f.Client() - if err != nil { - return err - } - o.arkClient = arkClient - - return nil -} - -func (o *InitRepositoryOptions) Run(f client.Factory) error { - if err := restic.NewRepositoryKey(o.kubeClient.CoreV1(), o.Namespace, o.keyBytes); err != nil { - return err - } - - repo := &v1.ResticRepository{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: f.Namespace(), - Name: o.Namespace, - }, - Spec: v1.ResticRepositorySpec{ - MaintenanceFrequency: metav1.Duration{Duration: o.MaintenanceFrequency}, - }, - } - - _, err := o.arkClient.ArkV1().ResticRepositories(f.Namespace()).Create(repo) - return errors.Wrap(err, "error creating ResticRepository") -} diff --git a/pkg/cmd/cli/restic/repo/init_test.go b/pkg/cmd/cli/restic/repo/init_test.go deleted file mode 100644 index eea504964..000000000 --- a/pkg/cmd/cli/restic/repo/init_test.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2018 the Heptio Ark contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package repo - -import ( - "testing" - - "github.com/spf13/pflag" - "github.com/stretchr/testify/assert" - - "k8s.io/client-go/kubernetes" - - "github.com/heptio/ark/pkg/client" - clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" - arktest "github.com/heptio/ark/pkg/util/test" -) - -type fakeFactory struct{} - -var _ client.Factory = &fakeFactory{} - -func (f *fakeFactory) BindFlags(flags *pflag.FlagSet) { - panic("not implemented") -} - -func (f *fakeFactory) Client() (clientset.Interface, error) { - panic("not implemented") -} - -func (f *fakeFactory) KubeClient() (kubernetes.Interface, error) { - panic("not implemented") -} - -func (f *fakeFactory) Namespace() string { - return "" -} - -func TestComplete(t *testing.T) { - // no key options provided should error - o := &InitRepositoryOptions{} - err := o.Complete(&fakeFactory{}, []string{"ns"}) - assert.EqualError(t, err, errKeySizeTooSmall.Error()) - - // both KeyFile and KeyData provided should error - o = &InitRepositoryOptions{ - KeyFile: "/foo", - KeyData: "bar", - } - err = o.Complete(&fakeFactory{}, []string{"ns"}) - assert.EqualError(t, err, errKeyFileAndKeyDataProvided.Error()) - - // if KeyFile is provided, its contents are used - fileContents := []byte("bar") - o = &InitRepositoryOptions{ - KeyFile: "/foo", - fileSystem: arktest.NewFakeFileSystem().WithFile("/foo", fileContents), - } - assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"})) - assert.Equal(t, fileContents, o.keyBytes) - - // if KeyData is provided, it's used - o = &InitRepositoryOptions{ - KeyData: "bar", - } - assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"})) - assert.Equal(t, []byte(o.KeyData), o.keyBytes) - - // if KeySize is provided, a random key is generated - o = &InitRepositoryOptions{ - KeySize: 10, - } - assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"})) - assert.Equal(t, o.KeySize, len(o.keyBytes)) -} diff --git a/pkg/cmd/cli/restic/repo/repo.go b/pkg/cmd/cli/restic/repo/repo.go index ff7bb3b37..e8b9bcaeb 100644 --- a/pkg/cmd/cli/restic/repo/repo.go +++ b/pkg/cmd/cli/restic/repo/repo.go @@ -30,7 +30,6 @@ func NewRepositoryCommand(f client.Factory) *cobra.Command { } c.AddCommand( - NewInitCommand(f), NewGetCommand(f, "get"), ) diff --git a/pkg/cmd/cli/restic/server.go b/pkg/cmd/cli/restic/server.go index 747baeb0f..c0ab1297c 100644 --- a/pkg/cmd/cli/restic/server.go +++ b/pkg/cmd/cli/restic/server.go @@ -24,6 +24,7 @@ import ( "github.com/heptio/ark/pkg/controller" clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/restic" "github.com/heptio/ark/pkg/util/logging" ) @@ -59,6 +60,7 @@ type resticServer struct { arkInformerFactory informers.SharedInformerFactory kubeInformerFactory kubeinformers.SharedInformerFactory podInformer cache.SharedIndexInformer + secretInformer cache.SharedIndexInformer logger logrus.FieldLogger ctx context.Context cancelFunc context.CancelFunc @@ -84,7 +86,7 @@ func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer, // filter to only pods scheduled on this node. podInformer := corev1informers.NewFilteredPodInformer( kubeClient, - "", + metav1.NamespaceAll, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(opts *metav1.ListOptions) { @@ -92,6 +94,22 @@ func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer, }, ) + // use a stand-alone secrets informer so we can filter to only the restic credentials + // secret(s) within the heptio-ark namespace + // + // note: using an informer to access the single secret for all ark-managed + // restic repositories is overkill for now, but will be useful when we move + // to fully-encrypted backups and have unique keys per repository. + secretInformer := corev1informers.NewFilteredSecretInformer( + kubeClient, + os.Getenv("HEPTIO_ARK_NAMESPACE"), + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(opts *metav1.ListOptions) { + opts.FieldSelector = fmt.Sprintf("metadata.name=%s", restic.CredentialsSecretName) + }, + ) + ctx, cancelFunc := context.WithCancel(context.Background()) return &resticServer{ @@ -100,6 +118,7 @@ func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer, arkInformerFactory: informers.NewFilteredSharedInformerFactory(arkClient, 0, os.Getenv("HEPTIO_ARK_NAMESPACE"), nil), kubeInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0), podInformer: podInformer, + secretInformer: secretInformer, logger: logger, ctx: ctx, cancelFunc: cancelFunc, @@ -118,7 +137,7 @@ func (s *resticServer) run() { s.arkInformerFactory.Ark().V1().PodVolumeBackups(), s.arkClient.ArkV1(), s.podInformer, - s.kubeInformerFactory.Core().V1().Secrets(), + s.secretInformer, s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(), os.Getenv("NODE_NAME"), ) @@ -133,7 +152,7 @@ func (s *resticServer) run() { s.arkInformerFactory.Ark().V1().PodVolumeRestores(), s.arkClient.ArkV1(), s.podInformer, - s.kubeInformerFactory.Core().V1().Secrets(), + s.secretInformer, s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(), os.Getenv("NODE_NAME"), ) @@ -146,6 +165,7 @@ func (s *resticServer) run() { go s.arkInformerFactory.Start(s.ctx.Done()) go s.kubeInformerFactory.Start(s.ctx.Done()) go s.podInformer.Run(s.ctx.Done()) + go s.secretInformer.Run(s.ctx.Done()) s.logger.Info("Controllers started successfully") diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 28094665d..d7a3f1ef5 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -235,14 +235,6 @@ func (s *server) run() error { if err := s.initRestic(config.BackupStorageProvider); err != nil { return err } - - // warn if restic daemonset does not exist - _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get("restic", metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - s.logger.Warn("Ark restic DaemonSet not found; restic backups will fail until it's created") - } else if err != nil { - return errors.WithStack(err) - } } if err := s.runControllers(config); err != nil { @@ -457,15 +449,33 @@ func durationMin(a, b time.Duration) time.Duration { } func (s *server) initRestic(config api.ObjectStorageProviderConfig) error { + // warn if restic daemonset does not exist + if _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get(restic.DaemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) { + s.logger.Warn("Ark restic daemonset not found; restic backups/restores will not work until it's created") + } else if err != nil { + s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of ark restic daemonset") + } + + // ensure the repo key secret is set up + if err := restic.EnsureCommonRepositoryKey(s.kubeClient.CoreV1(), s.namespace); err != nil { + return err + } + // set the env vars that restic uses for creds purposes if config.Name == string(restic.AzureBackend) { os.Setenv("AZURE_ACCOUNT_NAME", os.Getenv("AZURE_STORAGE_ACCOUNT_ID")) os.Setenv("AZURE_ACCOUNT_KEY", os.Getenv("AZURE_STORAGE_KEY")) } + // use a stand-alone secrets informer so we can filter to only the restic credentials + // secret(s) within the heptio-ark namespace + // + // note: using an informer to access the single secret for all ark-managed + // restic repositories is overkill for now, but will be useful when we move + // to fully-encrypted backups and have unique keys per repository. secretsInformer := corev1informers.NewFilteredSecretInformer( s.kubeClient, - "", + s.namespace, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(opts *metav1.ListOptions) { @@ -479,8 +489,8 @@ func (s *server) initRestic(config api.ObjectStorageProviderConfig) error { s.namespace, s.arkClient, secretsInformer, - s.kubeClient.CoreV1(), s.sharedInformerFactory.Ark().V1().ResticRepositories(), + s.arkClient.ArkV1(), s.logger, ) if err != nil { diff --git a/pkg/controller/backup_deletion_controller.go b/pkg/controller/backup_deletion_controller.go index 4d8d91f3d..aeae503b6 100644 --- a/pkg/controller/backup_deletion_controller.go +++ b/pkg/controller/backup_deletion_controller.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "encoding/json" "time" @@ -39,6 +40,8 @@ import ( "k8s.io/client-go/tools/cache" ) +const resticTimeout = time.Minute + type backupDeletionController struct { *genericController @@ -239,13 +242,9 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e // Try to delete restic snapshots log.Info("Removing restic snapshots") - if snapshots, err := restic.GetSnapshotsInBackup(backup, c.podvolumeBackupLister); err != nil { - errs = append(errs, err.Error()) - } else { - for _, snapshot := range snapshots { - if err := c.resticMgr.Forget(snapshot); err != nil { - errs = append(errs, err.Error()) - } + if deleteErrs := c.deleteResticSnapshots(backup); len(deleteErrs) > 0 { + for _, err := range deleteErrs { + errs = append(errs, err.Error()) } } @@ -304,6 +303,29 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e return nil } +func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error { + if c.resticMgr == nil { + return nil + } + + snapshots, err := restic.GetSnapshotsInBackup(backup, c.podvolumeBackupLister) + if err != nil { + return []error{err} + } + + ctx, cancelFunc := context.WithTimeout(context.Background(), resticTimeout) + defer cancelFunc() + + var errs []error + for _, snapshot := range snapshots { + if err := c.resticMgr.Forget(ctx, snapshot); err != nil { + errs = append(errs, err) + } + } + + return errs +} + const deleteBackupRequestMaxAge = 24 * time.Hour func (c *backupDeletionController) deleteExpiredRequests() { diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 4d434a7bf..639d113b0 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -60,7 +60,7 @@ func NewPodVolumeBackupController( podVolumeBackupInformer informers.PodVolumeBackupInformer, podVolumeBackupClient arkv1client.PodVolumeBackupsGetter, podInformer cache.SharedIndexInformer, - secretInformer corev1informers.SecretInformer, + secretInformer cache.SharedIndexInformer, pvcInformer corev1informers.PersistentVolumeClaimInformer, nodeName string, ) Interface { @@ -69,7 +69,7 @@ func NewPodVolumeBackupController( podVolumeBackupClient: podVolumeBackupClient, podVolumeBackupLister: podVolumeBackupInformer.Lister(), podLister: corev1listers.NewPodLister(podInformer.GetIndexer()), - secretLister: secretInformer.Lister(), + secretLister: corev1listers.NewSecretLister(secretInformer.GetIndexer()), pvcLister: pvcInformer.Lister(), nodeName: nodeName, } @@ -78,8 +78,8 @@ func NewPodVolumeBackupController( c.cacheSyncWaiters = append( c.cacheSyncWaiters, podVolumeBackupInformer.Informer().HasSynced, - secretInformer.Informer().HasSynced, podInformer.HasSynced, + secretInformer.HasSynced, pvcInformer.Informer().HasSynced, ) c.processBackupFunc = c.processBackup @@ -194,7 +194,7 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) log.WithField("path", path).Debugf("Found path matching glob") // temp creds - file, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace) + file, err := restic.TempCredentialsFile(c.secretLister, req.Namespace, req.Spec.Pod.Namespace) if err != nil { log.WithError(err).Error("Error creating temp restic credentials file") return c.fail(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 823a8e363..4a615fb60 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -50,8 +50,8 @@ type podVolumeRestoreController struct { podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter podVolumeRestoreLister listers.PodVolumeRestoreLister - secretLister corev1listers.SecretLister podLister corev1listers.PodLister + secretLister corev1listers.SecretLister pvcLister corev1listers.PersistentVolumeClaimLister nodeName string @@ -64,7 +64,7 @@ func NewPodVolumeRestoreController( podVolumeRestoreInformer informers.PodVolumeRestoreInformer, podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter, podInformer cache.SharedIndexInformer, - secretInformer corev1informers.SecretInformer, + secretInformer cache.SharedIndexInformer, pvcInformer corev1informers.PersistentVolumeClaimInformer, nodeName string, ) Interface { @@ -73,7 +73,7 @@ func NewPodVolumeRestoreController( podVolumeRestoreClient: podVolumeRestoreClient, podVolumeRestoreLister: podVolumeRestoreInformer.Lister(), podLister: corev1listers.NewPodLister(podInformer.GetIndexer()), - secretLister: secretInformer.Lister(), + secretLister: corev1listers.NewSecretLister(secretInformer.GetIndexer()), pvcLister: pvcInformer.Lister(), nodeName: nodeName, } @@ -82,8 +82,8 @@ func NewPodVolumeRestoreController( c.cacheSyncWaiters = append( c.cacheSyncWaiters, podVolumeRestoreInformer.Informer().HasSynced, - secretInformer.Informer().HasSynced, podInformer.HasSynced, + secretInformer.HasSynced, pvcInformer.Informer().HasSynced, ) c.processRestoreFunc = c.processRestore @@ -270,7 +270,7 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto return c.failRestore(req, errors.Wrap(err, "error getting volume directory name").Error(), log) } - credsFile, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace) + credsFile, err := restic.TempCredentialsFile(c.secretLister, req.Namespace, req.Spec.Pod.Namespace) if err != nil { log.WithError(err).Error("Error creating temp restic credentials file") return c.failRestore(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log) diff --git a/pkg/controller/restic_repository_controller.go b/pkg/controller/restic_repository_controller.go index 16378e60f..783baa915 100644 --- a/pkg/controller/restic_repository_controller.go +++ b/pkg/controller/restic_repository_controller.go @@ -148,7 +148,7 @@ func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, lo return err } - if err := ensureRepo(req.Name, c.repositoryManager); err != nil { + if err := ensureRepo(req.Name, req.Spec.ResticIdentifier, c.repositoryManager); err != nil { return c.patchResticRepository(req, repoNotReady(err.Error())) } @@ -160,12 +160,12 @@ func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, lo // ensureRepo first checks the repo, and returns if check passes. If it fails, // attempts to init the repo, and returns the result. -func ensureRepo(name string, repoManager restic.RepositoryManager) error { - if repoManager.CheckRepo(name) == nil { +func ensureRepo(name, identifier string, repoManager restic.RepositoryManager) error { + if repoManager.CheckRepo(name, identifier) == nil { return nil } - return repoManager.InitRepo(name) + return repoManager.InitRepo(name, identifier) } func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepository, log logrus.FieldLogger) error { @@ -181,14 +181,14 @@ func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepositor log.Info("Running maintenance on restic repository") log.Debug("Checking repo before prune") - if err := c.repositoryManager.CheckRepo(req.Name); err != nil { + if err := c.repositoryManager.CheckRepo(req.Name, req.Spec.ResticIdentifier); err != nil { return c.patchResticRepository(req, repoNotReady(err.Error())) } // prune failures should be displayed in the `.status.message` field but // should not cause the repo to move to `NotReady`. log.Debug("Pruning repo") - if err := c.repositoryManager.PruneRepo(req.Name); err != nil { + if err := c.repositoryManager.PruneRepo(req.Name, req.Spec.ResticIdentifier); err != nil { log.WithError(err).Warn("error pruning repository") if patchErr := c.patchResticRepository(req, func(r *v1.ResticRepository) { r.Status.Message = err.Error() @@ -198,7 +198,7 @@ func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepositor } log.Debug("Checking repo after prune") - if err := c.repositoryManager.CheckRepo(req.Name); err != nil { + if err := c.repositoryManager.CheckRepo(req.Name, req.Spec.ResticIdentifier); err != nil { return c.patchResticRepository(req, repoNotReady(err.Error())) } @@ -216,7 +216,7 @@ func (c *resticRepositoryController) checkNotReadyRepo(req *v1.ResticRepository, // we need to ensure it (first check, if check fails, attempt to init) // because we don't know if it's been successfully initialized yet. - if err := ensureRepo(req.Name, c.repositoryManager); err != nil { + if err := ensureRepo(req.Name, req.Spec.ResticIdentifier, c.repositoryManager); err != nil { return c.patchResticRepository(req, repoNotReady(err.Error())) } diff --git a/pkg/restic/backupper.go b/pkg/restic/backupper.go index ec8672c33..f05a3aea9 100644 --- a/pkg/restic/backupper.go +++ b/pkg/restic/backupper.go @@ -25,12 +25,10 @@ import ( "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" - arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" "github.com/heptio/ark/pkg/util/boolptr" ) @@ -43,7 +41,7 @@ type Backupper interface { type backupper struct { ctx context.Context repoManager *repositoryManager - repoLister arkv1listers.ResticRepositoryLister + repoEnsurer *repositoryEnsurer results map[string]chan *arkv1api.PodVolumeBackup resultsLock sync.Mutex @@ -52,13 +50,14 @@ type backupper struct { func newBackupper( ctx context.Context, repoManager *repositoryManager, + repoEnsurer *repositoryEnsurer, podVolumeBackupInformer cache.SharedIndexInformer, - repoLister arkv1listers.ResticRepositoryLister, + log logrus.FieldLogger, ) *backupper { b := &backupper{ ctx: ctx, repoManager: repoManager, - repoLister: repoLister, + repoEnsurer: repoEnsurer, results: make(map[string]chan *arkv1api.PodVolumeBackup), } @@ -70,8 +69,14 @@ func newBackupper( if pvb.Status.Phase == arkv1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == arkv1api.PodVolumeBackupPhaseFailed { b.resultsLock.Lock() - b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)] <- pvb - b.resultsLock.Unlock() + defer b.resultsLock.Unlock() + + resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)] + if !ok { + log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name) + return + } + resChan <- pvb } }, }, @@ -84,31 +89,6 @@ func resultsKey(ns, name string) string { return fmt.Sprintf("%s/%s", ns, name) } -func getRepo(repoLister arkv1listers.ResticRepositoryLister, ns, name string) (*arkv1api.ResticRepository, error) { - repo, err := repoLister.ResticRepositories(ns).Get(name) - if apierrors.IsNotFound(err) { - return nil, errors.Wrapf(err, "restic repository not found") - } - if err != nil { - return nil, errors.Wrapf(err, "error getting restic repository") - } - - return repo, nil -} - -func getReadyRepo(repoLister arkv1listers.ResticRepositoryLister, ns, name string) (*arkv1api.ResticRepository, error) { - repo, err := getRepo(repoLister, ns, name) - if err != nil { - return nil, err - } - - if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady { - return nil, errors.New("restic repository not ready") - } - - return repo, nil -} - func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod, log logrus.FieldLogger) (map[string]string, []error) { // get volumes to backup from pod's annotations volumesToBackup := GetVolumesToBackup(pod) @@ -116,11 +96,16 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod return nil, nil } - repo, err := getReadyRepo(b.repoLister, backup.Namespace, pod.Namespace) + repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace) if err != nil { return nil, []error{err} } + // get a single non-exclusive lock since we'll wait for all individual + // backups to be complete before releasing it. + b.repoManager.repoLocker.Lock(pod.Namespace) + defer b.repoManager.repoLocker.Unlock(pod.Namespace) + resultsChan := make(chan *arkv1api.PodVolumeBackup) b.resultsLock.Lock() @@ -133,9 +118,6 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod ) for _, volumeName := range volumesToBackup { - b.repoManager.repoLocker.Lock(pod.Namespace) - defer b.repoManager.repoLocker.Unlock(pod.Namespace) - volumeBackup := newPodVolumeBackup(backup, pod, volumeName, repo.Spec.ResticIdentifier) if err := errorOnly(b.repoManager.arkClient.ArkV1().PodVolumeBackups(volumeBackup.Namespace).Create(volumeBackup)); err != nil { diff --git a/pkg/restic/command_factory.go b/pkg/restic/command_factory.go index df6fcaa8d..e460911ca 100644 --- a/pkg/restic/command_factory.go +++ b/pkg/restic/command_factory.go @@ -1,3 +1,19 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package restic import ( diff --git a/pkg/restic/common.go b/pkg/restic/common.go index b6dd3ebaa..08b606e42 100644 --- a/pkg/restic/common.go +++ b/pkg/restic/common.go @@ -33,6 +33,7 @@ import ( ) const ( + DaemonSet = "restic" InitContainer = "restic-wait" DefaultMaintenanceFrequency = 24 * time.Hour @@ -143,9 +144,14 @@ func GetSnapshotsInBackup(backup *arkv1api.Backup, podVolumeBackupLister arkv1li // encryption key for the given repo and returns its path. The // caller should generally call os.Remove() to remove the file // when done with it. -func TempCredentialsFile(secretLister corev1listers.SecretLister, repoName string) (string, error) { +func TempCredentialsFile(secretLister corev1listers.SecretLister, arkNamespace, repoName string) (string, error) { secretGetter := NewListerSecretGetter(secretLister) - repoKey, err := GetRepositoryKey(secretGetter, repoName) + + // For now, all restic repos share the same key so we don't need the repoName to fetch it. + // When we move to full-backup encryption, we'll likely have a separate key per restic repo + // (all within the Ark server's namespace) so GetRepositoryKey will need to take the repo + // name as an argument as well. + repoKey, err := GetRepositoryKey(secretGetter, arkNamespace) if err != nil { return "", err } diff --git a/pkg/restic/exec_commands.go b/pkg/restic/exec_commands.go index cb5937135..3f2bbf81d 100644 --- a/pkg/restic/exec_commands.go +++ b/pkg/restic/exec_commands.go @@ -1,3 +1,19 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package restic import ( diff --git a/pkg/restic/repository_ensurer.go b/pkg/restic/repository_ensurer.go new file mode 100644 index 000000000..448a1e3c4 --- /dev/null +++ b/pkg/restic/repository_ensurer.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restic + +import ( + "context" + "sync" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" + arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" + arkv1informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" + arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" +) + +// repositoryEnsurer ensures that Ark restic repositories are created and ready. +type repositoryEnsurer struct { + repoLister arkv1listers.ResticRepositoryLister + repoClient arkv1client.ResticRepositoriesGetter + + readyChansLock sync.Mutex + readyChans map[string]chan *arkv1api.ResticRepository +} + +func newRepositoryEnsurer(repoInformer arkv1informers.ResticRepositoryInformer, repoClient arkv1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer { + r := &repositoryEnsurer{ + repoLister: repoInformer.Lister(), + repoClient: repoClient, + readyChans: make(map[string]chan *arkv1api.ResticRepository), + } + + repoInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, upd interface{}) { + oldObj := old.(*arkv1api.ResticRepository) + newObj := upd.(*arkv1api.ResticRepository) + + if oldObj.Status.Phase != arkv1api.ResticRepositoryPhaseReady && newObj.Status.Phase == arkv1api.ResticRepositoryPhaseReady { + r.readyChansLock.Lock() + defer r.readyChansLock.Unlock() + + readyChan, ok := r.readyChans[newObj.Name] + if !ok { + log.Errorf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name) + return + } + + readyChan <- newObj + delete(r.readyChans, newObj.Name) + } + }, + }, + ) + + return r +} + +func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, name string) (*arkv1api.ResticRepository, error) { + if repo, err := r.repoLister.ResticRepositories(namespace).Get(name); err != nil && !apierrors.IsNotFound(err) { + return nil, errors.WithStack(err) + } else if err == nil { + if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady { + return nil, errors.New("restic repository is not ready") + } + return repo, nil + } + + // if we're here, it means we got an IsNotFound error, meaning we need to create a new + // repo and wait for it to be ready + + repo := &arkv1api.ResticRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: arkv1api.ResticRepositorySpec{ + MaintenanceFrequency: metav1.Duration{Duration: DefaultMaintenanceFrequency}, + }, + } + + readyChan := r.getReadyChan(name) + defer close(readyChan) + + if _, err := r.repoClient.ResticRepositories(namespace).Create(repo); err != nil { + return nil, errors.Wrapf(err, "unable to create restic repository resource") + } + + select { + case <-ctx.Done(): + return nil, errors.New("timed out waiting for restic repository to become ready") + case res := <-readyChan: + return res, nil + } +} + +func (r *repositoryEnsurer) getReadyChan(name string) chan *arkv1api.ResticRepository { + r.readyChansLock.Lock() + defer r.readyChansLock.Unlock() + + r.readyChans[name] = make(chan *arkv1api.ResticRepository) + return r.readyChans[name] +} diff --git a/pkg/restic/repository_keys.go b/pkg/restic/repository_keys.go index 705f31c8e..67a3144f1 100644 --- a/pkg/restic/repository_keys.go +++ b/pkg/restic/repository_keys.go @@ -1,8 +1,26 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package restic import ( "github.com/pkg/errors" + corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" @@ -10,10 +28,22 @@ import ( const ( CredentialsSecretName = "ark-restic-credentials" - CredentialsKey = "ark-restic-credentials" + CredentialsKey = "repository-password" + + encryptionKey = "static-passw0rd" ) -func NewRepositoryKey(secretClient corev1client.SecretsGetter, namespace string, data []byte) error { +func EnsureCommonRepositoryKey(secretClient corev1client.SecretsGetter, namespace string) error { + _, err := secretClient.Secrets(namespace).Get(CredentialsSecretName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return errors.WithStack(err) + } + if err == nil { + return nil + } + + // if we got here, we got an IsNotFound error, so we need to create the key + secret := &corev1api.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -21,13 +51,12 @@ func NewRepositoryKey(secretClient corev1client.SecretsGetter, namespace string, }, Type: corev1api.SecretTypeOpaque, Data: map[string][]byte{ - CredentialsKey: data, + CredentialsKey: []byte(encryptionKey), }, } - _, err := secretClient.Secrets(namespace).Create(secret) - if err != nil { - return errors.WithStack(err) + if _, err = secretClient.Secrets(namespace).Create(secret); err != nil { + return errors.Wrapf(err, "error creating %s secret", CredentialsSecretName) } return nil diff --git a/pkg/restic/repository_manager.go b/pkg/restic/repository_manager.go index b714eac4d..fefa97be5 100644 --- a/pkg/restic/repository_manager.go +++ b/pkg/restic/repository_manager.go @@ -25,12 +25,12 @@ import ( "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" + arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" arkv1informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" arkexec "github.com/heptio/ark/pkg/util/exec" @@ -38,18 +38,18 @@ import ( // RepositoryManager executes commands against restic repositories. type RepositoryManager interface { - // InitRepo initializes a repo with the specified name. - InitRepo(name string) error + // InitRepo initializes a repo with the specified name and identifier. + InitRepo(name, identifier string) error // CheckRepo checks the specified repo for errors. - CheckRepo(name string) error + CheckRepo(name, identifier string) error // PruneRepo deletes unused data from a repo. - PruneRepo(name string) error + PruneRepo(name, identifier string) error // Forget removes a snapshot from the list of // available snapshots in a repo. - Forget(snapshot SnapshotIdentifier) error + Forget(context.Context, SnapshotIdentifier) error BackupperFactory @@ -74,11 +74,11 @@ type repositoryManager struct { namespace string arkClient clientset.Interface secretsLister corev1listers.SecretLister - secretsClient corev1client.SecretsGetter repoLister arkv1listers.ResticRepositoryLister repoInformerSynced cache.InformerSynced log logrus.FieldLogger repoLocker *repoLocker + repoEnsurer *repositoryEnsurer } // NewRepositoryManager constructs a RepositoryManager. @@ -87,19 +87,20 @@ func NewRepositoryManager( namespace string, arkClient clientset.Interface, secretsInformer cache.SharedIndexInformer, - secretsClient corev1client.SecretsGetter, repoInformer arkv1informers.ResticRepositoryInformer, + repoClient arkv1client.ResticRepositoriesGetter, log logrus.FieldLogger, ) (RepositoryManager, error) { rm := &repositoryManager{ namespace: namespace, arkClient: arkClient, secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()), - secretsClient: secretsClient, repoLister: repoInformer.Lister(), repoInformerSynced: repoInformer.Informer().HasSynced, log: log, - repoLocker: newRepoLocker(), + + repoLocker: newRepoLocker(), + repoEnsurer: newRepositoryEnsurer(repoInformer, repoClient, log), } if !cache.WaitForCacheSync(ctx.Done(), secretsInformer.HasSynced) { @@ -120,7 +121,7 @@ func (rm *repositoryManager) NewBackupper(ctx context.Context, backup *arkv1api. }, ) - b := newBackupper(ctx, rm, informer, rm.repoLister) + b := newBackupper(ctx, rm, rm.repoEnsurer, informer, rm.log) go informer.Run(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rm.repoInformerSynced) { @@ -141,72 +142,63 @@ func (rm *repositoryManager) NewRestorer(ctx context.Context, restore *arkv1api. }, ) - r := newRestorer(ctx, rm, informer, rm.repoLister) + r := newRestorer(ctx, rm, rm.repoEnsurer, informer, rm.log) go informer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rm.repoInformerSynced) { return nil, errors.New("timed out waiting for cache to sync") } return r, nil } -func (rm *repositoryManager) InitRepo(name string) error { - repo, err := getRepo(rm.repoLister, rm.namespace, name) - if err != nil { - return err - } - +func (rm *repositoryManager) InitRepo(name, identifier string) error { + // restic init requires an exclusive lock rm.repoLocker.LockExclusive(name) defer rm.repoLocker.UnlockExclusive(name) - return rm.exec(InitCommand(repo.Spec.ResticIdentifier)) + return rm.exec(InitCommand(identifier)) } -func (rm *repositoryManager) CheckRepo(name string) error { - repo, err := getRepo(rm.repoLister, rm.namespace, name) - if err != nil { - return err - } - +func (rm *repositoryManager) CheckRepo(name, identifier string) error { + // restic check requires an exclusive lock rm.repoLocker.LockExclusive(name) defer rm.repoLocker.UnlockExclusive(name) - cmd := CheckCommand(repo.Spec.ResticIdentifier) - - return rm.exec(cmd) + return rm.exec(CheckCommand(identifier)) } -func (rm *repositoryManager) PruneRepo(name string) error { - repo, err := getReadyRepo(rm.repoLister, rm.namespace, name) - if err != nil { - return err - } - +func (rm *repositoryManager) PruneRepo(name, identifier string) error { + // restic prune requires an exclusive lock rm.repoLocker.LockExclusive(name) defer rm.repoLocker.UnlockExclusive(name) - cmd := PruneCommand(repo.Spec.ResticIdentifier) - - return rm.exec(cmd) + return rm.exec(PruneCommand(identifier)) } -func (rm *repositoryManager) Forget(snapshot SnapshotIdentifier) error { - repo, err := getReadyRepo(rm.repoLister, rm.namespace, snapshot.Repo) +func (rm *repositoryManager) Forget(ctx context.Context, snapshot SnapshotIdentifier) error { + // We can't wait for this in the constructor, because this informer is coming + // from the shared informer factory, which isn't started until *after* the repo + // manager is instantiated & passed to the controller constructors. We'd get a + // deadlock if we tried to wait for this in the constructor. + if !cache.WaitForCacheSync(ctx.Done(), rm.repoInformerSynced) { + return errors.New("timed out waiting for cache to sync") + } + + repo, err := rm.repoEnsurer.EnsureRepo(ctx, rm.namespace, snapshot.Repo) if err != nil { return err } - rm.repoLocker.LockExclusive(snapshot.Repo) - defer rm.repoLocker.UnlockExclusive(snapshot.Repo) + // restic forget requires an exclusive lock + rm.repoLocker.LockExclusive(repo.Name) + defer rm.repoLocker.UnlockExclusive(repo.Name) - cmd := ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID) - - return rm.exec(cmd) + return rm.exec(ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID)) } func (rm *repositoryManager) exec(cmd *Command) error { - file, err := TempCredentialsFile(rm.secretsLister, cmd.RepoName()) + file, err := TempCredentialsFile(rm.secretsLister, rm.namespace, cmd.RepoName()) if err != nil { return err } diff --git a/pkg/restic/restorer.go b/pkg/restic/restorer.go index ebfdd3a1a..6d538b735 100644 --- a/pkg/restic/restorer.go +++ b/pkg/restic/restorer.go @@ -28,7 +28,6 @@ import ( "k8s.io/client-go/tools/cache" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" - arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" "github.com/heptio/ark/pkg/util/boolptr" ) @@ -41,7 +40,7 @@ type Restorer interface { type restorer struct { ctx context.Context repoManager *repositoryManager - repoLister arkv1listers.ResticRepositoryLister + repoEnsurer *repositoryEnsurer resultsLock sync.Mutex results map[string]chan *arkv1api.PodVolumeRestore @@ -50,13 +49,14 @@ type restorer struct { func newRestorer( ctx context.Context, rm *repositoryManager, + repoEnsurer *repositoryEnsurer, podVolumeRestoreInformer cache.SharedIndexInformer, - repoLister arkv1listers.ResticRepositoryLister, + log logrus.FieldLogger, ) *restorer { r := &restorer{ ctx: ctx, repoManager: rm, - repoLister: repoLister, + repoEnsurer: repoEnsurer, results: make(map[string]chan *arkv1api.PodVolumeRestore), } @@ -68,8 +68,14 @@ func newRestorer( if pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseFailed { r.resultsLock.Lock() - r.results[resultsKey(pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)] <- pvr - r.resultsLock.Unlock() + defer r.resultsLock.Unlock() + + resChan, ok := r.results[resultsKey(pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)] + if !ok { + log.Errorf("No results channel found for pod %s/%s to send pod volume restore %s/%s on", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name, pvr.Namespace, pvr.Name) + return + } + resChan <- pvr } }, }, @@ -85,11 +91,16 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P return nil } - repo, err := getReadyRepo(r.repoLister, restore.Namespace, pod.Namespace) + repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, pod.Namespace) if err != nil { return []error{err} } + // get a single non-exclusive lock since we'll wait for all individual + // restores to be complete before releasing it. + r.repoManager.repoLocker.Lock(pod.Namespace) + defer r.repoManager.repoLocker.Unlock(pod.Namespace) + resultsChan := make(chan *arkv1api.PodVolumeRestore) r.resultsLock.Lock() @@ -102,9 +113,6 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P ) for volume, snapshot := range volumesToRestore { - r.repoManager.repoLocker.Lock(pod.Namespace) - defer r.repoManager.repoLocker.Unlock(pod.Namespace) - volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, repo.Spec.ResticIdentifier) if err := errorOnly(r.repoManager.arkClient.ArkV1().PodVolumeRestores(volumeRestore.Namespace).Create(volumeRestore)); err != nil {