replace ark restic repo init cmd with automatic initialization of repos

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2018-06-22 12:07:23 -07:00
parent e015238e6d
commit 22e8f23e2c
19 changed files with 360 additions and 433 deletions

View File

@@ -31,5 +31,4 @@ Work with restic repositories
### SEE ALSO ### SEE ALSO
* [ark restic](ark_restic.md) - Work with restic * [ark restic](ark_restic.md) - Work with restic
* [ark restic repo get](ark_restic_repo_get.md) - Get restic repositories * [ark restic repo get](ark_restic_repo_get.md) - Get restic repositories
* [ark restic repo init](ark_restic_repo_init.md) - initialize a restic repository for a specified namespace

View File

@@ -1,41 +0,0 @@
## ark restic repo init
initialize a restic repository for a specified namespace
### Synopsis
initialize a restic repository for a specified namespace
```
ark restic repo init NAMESPACE [flags]
```
### Options
```
-h, --help help for init
--key-data string Encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you.
--key-file string Path to file containing the encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you.
--key-size int Size of the generated key for the restic repository (default 1024)
--maintenance-frequency duration How often maintenance (i.e. restic prune & check) is run on the repository (default 24h0m0s)
```
### Options inherited from parent commands
```
--alsologtostderr log to standard error as well as files
--kubeconfig string Path to the kubeconfig file to use to talk to the Kubernetes apiserver. If unset, try the environment variable KUBECONFIG, as well as in-cluster configuration
--kubecontext string The context to use to talk to the Kubernetes apiserver. If unset defaults to whatever your current-context is (kubectl config current-context)
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
--log_dir string If non-empty, write log files in this directory
--logtostderr log to standard error instead of files
-n, --namespace string The namespace in which Ark should operate (default "heptio-ark")
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
-v, --v Level log level for V logs
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
```
### SEE ALSO
* [ark restic repo](ark_restic_repo.md) - Work with restic repositories

View File

@@ -1,165 +0,0 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repo
import (
"crypto/rand"
"time"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclientset "k8s.io/client-go/kubernetes"
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cmd"
clientset "github.com/heptio/ark/pkg/generated/clientset/versioned"
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/filesystem"
)
func NewInitCommand(f client.Factory) *cobra.Command {
o := NewInitRepositoryOptions()
c := &cobra.Command{
Use: "init NAMESPACE",
Short: "initialize a restic repository for a specified namespace",
Long: "initialize a restic repository for a specified namespace",
Args: cobra.ExactArgs(1),
Run: func(c *cobra.Command, args []string) {
cmd.CheckError(o.Complete(f, args))
cmd.CheckError(o.Validate(f))
cmd.CheckError(o.Run(f))
},
}
o.BindFlags(c.Flags())
return c
}
type InitRepositoryOptions struct {
Namespace string
KeyFile string
KeyData string
KeySize int
MaintenanceFrequency time.Duration
fileSystem filesystem.Interface
kubeClient kclientset.Interface
arkClient clientset.Interface
keyBytes []byte
}
func NewInitRepositoryOptions() *InitRepositoryOptions {
return &InitRepositoryOptions{
KeySize: 1024,
MaintenanceFrequency: restic.DefaultMaintenanceFrequency,
fileSystem: filesystem.NewFileSystem(),
}
}
var (
errKeyFileAndKeyDataProvided = errors.Errorf("only one of --key-file and --key-data may be specified")
errKeySizeTooSmall = errors.Errorf("--key-size must be at least 1")
)
func (o *InitRepositoryOptions) BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.KeyFile, "key-file", o.KeyFile, "Path to file containing the encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you.")
flags.StringVar(&o.KeyData, "key-data", o.KeyData, "Encryption key for the restic repository. Optional; if unset, Ark will generate a random key for you.")
flags.IntVar(&o.KeySize, "key-size", o.KeySize, "Size of the generated key for the restic repository")
flags.DurationVar(&o.MaintenanceFrequency, "maintenance-frequency", o.MaintenanceFrequency, "How often maintenance (i.e. restic prune & check) is run on the repository")
}
func (o *InitRepositoryOptions) Complete(f client.Factory, args []string) error {
if o.KeyFile != "" && o.KeyData != "" {
return errKeyFileAndKeyDataProvided
}
if o.KeyFile == "" && o.KeyData == "" && o.KeySize < 1 {
return errKeySizeTooSmall
}
o.Namespace = args[0]
switch {
case o.KeyFile != "":
data, err := o.fileSystem.ReadFile(o.KeyFile)
if err != nil {
return err
}
o.keyBytes = data
case o.KeyData != "":
o.keyBytes = []byte(o.KeyData)
case o.KeySize > 0:
o.keyBytes = make([]byte, o.KeySize)
// rand.Reader always returns a nil error
rand.Read(o.keyBytes)
}
return nil
}
func (o *InitRepositoryOptions) Validate(f client.Factory) error {
if len(o.keyBytes) == 0 {
return errors.Errorf("keyBytes is required")
}
if o.MaintenanceFrequency <= 0 {
return errors.Errorf("--maintenance-frequency must be greater than zero")
}
kubeClient, err := f.KubeClient()
if err != nil {
return err
}
o.kubeClient = kubeClient
if _, err := kubeClient.CoreV1().Namespaces().Get(o.Namespace, metav1.GetOptions{}); err != nil {
return err
}
arkClient, err := f.Client()
if err != nil {
return err
}
o.arkClient = arkClient
return nil
}
func (o *InitRepositoryOptions) Run(f client.Factory) error {
if err := restic.NewRepositoryKey(o.kubeClient.CoreV1(), o.Namespace, o.keyBytes); err != nil {
return err
}
repo := &v1.ResticRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: f.Namespace(),
Name: o.Namespace,
},
Spec: v1.ResticRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: o.MaintenanceFrequency},
},
}
_, err := o.arkClient.ArkV1().ResticRepositories(f.Namespace()).Create(repo)
return errors.Wrap(err, "error creating ResticRepository")
}

View File

@@ -1,88 +0,0 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repo
import (
"testing"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/kubernetes"
"github.com/heptio/ark/pkg/client"
clientset "github.com/heptio/ark/pkg/generated/clientset/versioned"
arktest "github.com/heptio/ark/pkg/util/test"
)
type fakeFactory struct{}
var _ client.Factory = &fakeFactory{}
func (f *fakeFactory) BindFlags(flags *pflag.FlagSet) {
panic("not implemented")
}
func (f *fakeFactory) Client() (clientset.Interface, error) {
panic("not implemented")
}
func (f *fakeFactory) KubeClient() (kubernetes.Interface, error) {
panic("not implemented")
}
func (f *fakeFactory) Namespace() string {
return ""
}
func TestComplete(t *testing.T) {
// no key options provided should error
o := &InitRepositoryOptions{}
err := o.Complete(&fakeFactory{}, []string{"ns"})
assert.EqualError(t, err, errKeySizeTooSmall.Error())
// both KeyFile and KeyData provided should error
o = &InitRepositoryOptions{
KeyFile: "/foo",
KeyData: "bar",
}
err = o.Complete(&fakeFactory{}, []string{"ns"})
assert.EqualError(t, err, errKeyFileAndKeyDataProvided.Error())
// if KeyFile is provided, its contents are used
fileContents := []byte("bar")
o = &InitRepositoryOptions{
KeyFile: "/foo",
fileSystem: arktest.NewFakeFileSystem().WithFile("/foo", fileContents),
}
assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"}))
assert.Equal(t, fileContents, o.keyBytes)
// if KeyData is provided, it's used
o = &InitRepositoryOptions{
KeyData: "bar",
}
assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"}))
assert.Equal(t, []byte(o.KeyData), o.keyBytes)
// if KeySize is provided, a random key is generated
o = &InitRepositoryOptions{
KeySize: 10,
}
assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"}))
assert.Equal(t, o.KeySize, len(o.keyBytes))
}

View File

@@ -30,7 +30,6 @@ func NewRepositoryCommand(f client.Factory) *cobra.Command {
} }
c.AddCommand( c.AddCommand(
NewInitCommand(f),
NewGetCommand(f, "get"), NewGetCommand(f, "get"),
) )

View File

@@ -24,6 +24,7 @@ import (
"github.com/heptio/ark/pkg/controller" "github.com/heptio/ark/pkg/controller"
clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" clientset "github.com/heptio/ark/pkg/generated/clientset/versioned"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions" informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/logging" "github.com/heptio/ark/pkg/util/logging"
) )
@@ -59,6 +60,7 @@ type resticServer struct {
arkInformerFactory informers.SharedInformerFactory arkInformerFactory informers.SharedInformerFactory
kubeInformerFactory kubeinformers.SharedInformerFactory kubeInformerFactory kubeinformers.SharedInformerFactory
podInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer
secretInformer cache.SharedIndexInformer
logger logrus.FieldLogger logger logrus.FieldLogger
ctx context.Context ctx context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
@@ -84,7 +86,7 @@ func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer,
// filter to only pods scheduled on this node. // filter to only pods scheduled on this node.
podInformer := corev1informers.NewFilteredPodInformer( podInformer := corev1informers.NewFilteredPodInformer(
kubeClient, kubeClient,
"", metav1.NamespaceAll,
0, 0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) { func(opts *metav1.ListOptions) {
@@ -92,6 +94,22 @@ func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer,
}, },
) )
// use a stand-alone secrets informer so we can filter to only the restic credentials
// secret(s) within the heptio-ark namespace
//
// note: using an informer to access the single secret for all ark-managed
// restic repositories is overkill for now, but will be useful when we move
// to fully-encrypted backups and have unique keys per repository.
secretInformer := corev1informers.NewFilteredSecretInformer(
kubeClient,
os.Getenv("HEPTIO_ARK_NAMESPACE"),
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) {
opts.FieldSelector = fmt.Sprintf("metadata.name=%s", restic.CredentialsSecretName)
},
)
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
return &resticServer{ return &resticServer{
@@ -100,6 +118,7 @@ func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer,
arkInformerFactory: informers.NewFilteredSharedInformerFactory(arkClient, 0, os.Getenv("HEPTIO_ARK_NAMESPACE"), nil), arkInformerFactory: informers.NewFilteredSharedInformerFactory(arkClient, 0, os.Getenv("HEPTIO_ARK_NAMESPACE"), nil),
kubeInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0), kubeInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0),
podInformer: podInformer, podInformer: podInformer,
secretInformer: secretInformer,
logger: logger, logger: logger,
ctx: ctx, ctx: ctx,
cancelFunc: cancelFunc, cancelFunc: cancelFunc,
@@ -118,7 +137,7 @@ func (s *resticServer) run() {
s.arkInformerFactory.Ark().V1().PodVolumeBackups(), s.arkInformerFactory.Ark().V1().PodVolumeBackups(),
s.arkClient.ArkV1(), s.arkClient.ArkV1(),
s.podInformer, s.podInformer,
s.kubeInformerFactory.Core().V1().Secrets(), s.secretInformer,
s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(), s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(),
os.Getenv("NODE_NAME"), os.Getenv("NODE_NAME"),
) )
@@ -133,7 +152,7 @@ func (s *resticServer) run() {
s.arkInformerFactory.Ark().V1().PodVolumeRestores(), s.arkInformerFactory.Ark().V1().PodVolumeRestores(),
s.arkClient.ArkV1(), s.arkClient.ArkV1(),
s.podInformer, s.podInformer,
s.kubeInformerFactory.Core().V1().Secrets(), s.secretInformer,
s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(), s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(),
os.Getenv("NODE_NAME"), os.Getenv("NODE_NAME"),
) )
@@ -146,6 +165,7 @@ func (s *resticServer) run() {
go s.arkInformerFactory.Start(s.ctx.Done()) go s.arkInformerFactory.Start(s.ctx.Done())
go s.kubeInformerFactory.Start(s.ctx.Done()) go s.kubeInformerFactory.Start(s.ctx.Done())
go s.podInformer.Run(s.ctx.Done()) go s.podInformer.Run(s.ctx.Done())
go s.secretInformer.Run(s.ctx.Done())
s.logger.Info("Controllers started successfully") s.logger.Info("Controllers started successfully")

View File

@@ -235,14 +235,6 @@ func (s *server) run() error {
if err := s.initRestic(config.BackupStorageProvider); err != nil { if err := s.initRestic(config.BackupStorageProvider); err != nil {
return err return err
} }
// warn if restic daemonset does not exist
_, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get("restic", metav1.GetOptions{})
if apierrors.IsNotFound(err) {
s.logger.Warn("Ark restic DaemonSet not found; restic backups will fail until it's created")
} else if err != nil {
return errors.WithStack(err)
}
} }
if err := s.runControllers(config); err != nil { if err := s.runControllers(config); err != nil {
@@ -457,15 +449,33 @@ func durationMin(a, b time.Duration) time.Duration {
} }
func (s *server) initRestic(config api.ObjectStorageProviderConfig) error { func (s *server) initRestic(config api.ObjectStorageProviderConfig) error {
// warn if restic daemonset does not exist
if _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get(restic.DaemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
s.logger.Warn("Ark restic daemonset not found; restic backups/restores will not work until it's created")
} else if err != nil {
s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of ark restic daemonset")
}
// ensure the repo key secret is set up
if err := restic.EnsureCommonRepositoryKey(s.kubeClient.CoreV1(), s.namespace); err != nil {
return err
}
// set the env vars that restic uses for creds purposes // set the env vars that restic uses for creds purposes
if config.Name == string(restic.AzureBackend) { if config.Name == string(restic.AzureBackend) {
os.Setenv("AZURE_ACCOUNT_NAME", os.Getenv("AZURE_STORAGE_ACCOUNT_ID")) os.Setenv("AZURE_ACCOUNT_NAME", os.Getenv("AZURE_STORAGE_ACCOUNT_ID"))
os.Setenv("AZURE_ACCOUNT_KEY", os.Getenv("AZURE_STORAGE_KEY")) os.Setenv("AZURE_ACCOUNT_KEY", os.Getenv("AZURE_STORAGE_KEY"))
} }
// use a stand-alone secrets informer so we can filter to only the restic credentials
// secret(s) within the heptio-ark namespace
//
// note: using an informer to access the single secret for all ark-managed
// restic repositories is overkill for now, but will be useful when we move
// to fully-encrypted backups and have unique keys per repository.
secretsInformer := corev1informers.NewFilteredSecretInformer( secretsInformer := corev1informers.NewFilteredSecretInformer(
s.kubeClient, s.kubeClient,
"", s.namespace,
0, 0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) { func(opts *metav1.ListOptions) {
@@ -479,8 +489,8 @@ func (s *server) initRestic(config api.ObjectStorageProviderConfig) error {
s.namespace, s.namespace,
s.arkClient, s.arkClient,
secretsInformer, secretsInformer,
s.kubeClient.CoreV1(),
s.sharedInformerFactory.Ark().V1().ResticRepositories(), s.sharedInformerFactory.Ark().V1().ResticRepositories(),
s.arkClient.ArkV1(),
s.logger, s.logger,
) )
if err != nil { if err != nil {

View File

@@ -17,6 +17,7 @@ limitations under the License.
package controller package controller
import ( import (
"context"
"encoding/json" "encoding/json"
"time" "time"
@@ -39,6 +40,8 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
const resticTimeout = time.Minute
type backupDeletionController struct { type backupDeletionController struct {
*genericController *genericController
@@ -239,13 +242,9 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
// Try to delete restic snapshots // Try to delete restic snapshots
log.Info("Removing restic snapshots") log.Info("Removing restic snapshots")
if snapshots, err := restic.GetSnapshotsInBackup(backup, c.podvolumeBackupLister); err != nil { if deleteErrs := c.deleteResticSnapshots(backup); len(deleteErrs) > 0 {
errs = append(errs, err.Error()) for _, err := range deleteErrs {
} else { errs = append(errs, err.Error())
for _, snapshot := range snapshots {
if err := c.resticMgr.Forget(snapshot); err != nil {
errs = append(errs, err.Error())
}
} }
} }
@@ -304,6 +303,29 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return nil return nil
} }
func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error {
if c.resticMgr == nil {
return nil
}
snapshots, err := restic.GetSnapshotsInBackup(backup, c.podvolumeBackupLister)
if err != nil {
return []error{err}
}
ctx, cancelFunc := context.WithTimeout(context.Background(), resticTimeout)
defer cancelFunc()
var errs []error
for _, snapshot := range snapshots {
if err := c.resticMgr.Forget(ctx, snapshot); err != nil {
errs = append(errs, err)
}
}
return errs
}
const deleteBackupRequestMaxAge = 24 * time.Hour const deleteBackupRequestMaxAge = 24 * time.Hour
func (c *backupDeletionController) deleteExpiredRequests() { func (c *backupDeletionController) deleteExpiredRequests() {

View File

@@ -60,7 +60,7 @@ func NewPodVolumeBackupController(
podVolumeBackupInformer informers.PodVolumeBackupInformer, podVolumeBackupInformer informers.PodVolumeBackupInformer,
podVolumeBackupClient arkv1client.PodVolumeBackupsGetter, podVolumeBackupClient arkv1client.PodVolumeBackupsGetter,
podInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer,
secretInformer corev1informers.SecretInformer, secretInformer cache.SharedIndexInformer,
pvcInformer corev1informers.PersistentVolumeClaimInformer, pvcInformer corev1informers.PersistentVolumeClaimInformer,
nodeName string, nodeName string,
) Interface { ) Interface {
@@ -69,7 +69,7 @@ func NewPodVolumeBackupController(
podVolumeBackupClient: podVolumeBackupClient, podVolumeBackupClient: podVolumeBackupClient,
podVolumeBackupLister: podVolumeBackupInformer.Lister(), podVolumeBackupLister: podVolumeBackupInformer.Lister(),
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()), podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
secretLister: secretInformer.Lister(), secretLister: corev1listers.NewSecretLister(secretInformer.GetIndexer()),
pvcLister: pvcInformer.Lister(), pvcLister: pvcInformer.Lister(),
nodeName: nodeName, nodeName: nodeName,
} }
@@ -78,8 +78,8 @@ func NewPodVolumeBackupController(
c.cacheSyncWaiters = append( c.cacheSyncWaiters = append(
c.cacheSyncWaiters, c.cacheSyncWaiters,
podVolumeBackupInformer.Informer().HasSynced, podVolumeBackupInformer.Informer().HasSynced,
secretInformer.Informer().HasSynced,
podInformer.HasSynced, podInformer.HasSynced,
secretInformer.HasSynced,
pvcInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced,
) )
c.processBackupFunc = c.processBackup c.processBackupFunc = c.processBackup
@@ -194,7 +194,7 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
log.WithField("path", path).Debugf("Found path matching glob") log.WithField("path", path).Debugf("Found path matching glob")
// temp creds // temp creds
file, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace) file, err := restic.TempCredentialsFile(c.secretLister, req.Namespace, req.Spec.Pod.Namespace)
if err != nil { if err != nil {
log.WithError(err).Error("Error creating temp restic credentials file") log.WithError(err).Error("Error creating temp restic credentials file")
return c.fail(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log) return c.fail(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log)

View File

@@ -50,8 +50,8 @@ type podVolumeRestoreController struct {
podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter
podVolumeRestoreLister listers.PodVolumeRestoreLister podVolumeRestoreLister listers.PodVolumeRestoreLister
secretLister corev1listers.SecretLister
podLister corev1listers.PodLister podLister corev1listers.PodLister
secretLister corev1listers.SecretLister
pvcLister corev1listers.PersistentVolumeClaimLister pvcLister corev1listers.PersistentVolumeClaimLister
nodeName string nodeName string
@@ -64,7 +64,7 @@ func NewPodVolumeRestoreController(
podVolumeRestoreInformer informers.PodVolumeRestoreInformer, podVolumeRestoreInformer informers.PodVolumeRestoreInformer,
podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter, podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter,
podInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer,
secretInformer corev1informers.SecretInformer, secretInformer cache.SharedIndexInformer,
pvcInformer corev1informers.PersistentVolumeClaimInformer, pvcInformer corev1informers.PersistentVolumeClaimInformer,
nodeName string, nodeName string,
) Interface { ) Interface {
@@ -73,7 +73,7 @@ func NewPodVolumeRestoreController(
podVolumeRestoreClient: podVolumeRestoreClient, podVolumeRestoreClient: podVolumeRestoreClient,
podVolumeRestoreLister: podVolumeRestoreInformer.Lister(), podVolumeRestoreLister: podVolumeRestoreInformer.Lister(),
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()), podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
secretLister: secretInformer.Lister(), secretLister: corev1listers.NewSecretLister(secretInformer.GetIndexer()),
pvcLister: pvcInformer.Lister(), pvcLister: pvcInformer.Lister(),
nodeName: nodeName, nodeName: nodeName,
} }
@@ -82,8 +82,8 @@ func NewPodVolumeRestoreController(
c.cacheSyncWaiters = append( c.cacheSyncWaiters = append(
c.cacheSyncWaiters, c.cacheSyncWaiters,
podVolumeRestoreInformer.Informer().HasSynced, podVolumeRestoreInformer.Informer().HasSynced,
secretInformer.Informer().HasSynced,
podInformer.HasSynced, podInformer.HasSynced,
secretInformer.HasSynced,
pvcInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced,
) )
c.processRestoreFunc = c.processRestore c.processRestoreFunc = c.processRestore
@@ -270,7 +270,7 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto
return c.failRestore(req, errors.Wrap(err, "error getting volume directory name").Error(), log) return c.failRestore(req, errors.Wrap(err, "error getting volume directory name").Error(), log)
} }
credsFile, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace) credsFile, err := restic.TempCredentialsFile(c.secretLister, req.Namespace, req.Spec.Pod.Namespace)
if err != nil { if err != nil {
log.WithError(err).Error("Error creating temp restic credentials file") log.WithError(err).Error("Error creating temp restic credentials file")
return c.failRestore(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log) return c.failRestore(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log)

View File

@@ -148,7 +148,7 @@ func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, lo
return err return err
} }
if err := ensureRepo(req.Name, c.repositoryManager); err != nil { if err := ensureRepo(req.Name, req.Spec.ResticIdentifier, c.repositoryManager); err != nil {
return c.patchResticRepository(req, repoNotReady(err.Error())) return c.patchResticRepository(req, repoNotReady(err.Error()))
} }
@@ -160,12 +160,12 @@ func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, lo
// ensureRepo first checks the repo, and returns if check passes. If it fails, // ensureRepo first checks the repo, and returns if check passes. If it fails,
// attempts to init the repo, and returns the result. // attempts to init the repo, and returns the result.
func ensureRepo(name string, repoManager restic.RepositoryManager) error { func ensureRepo(name, identifier string, repoManager restic.RepositoryManager) error {
if repoManager.CheckRepo(name) == nil { if repoManager.CheckRepo(name, identifier) == nil {
return nil return nil
} }
return repoManager.InitRepo(name) return repoManager.InitRepo(name, identifier)
} }
func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepository, log logrus.FieldLogger) error { func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepository, log logrus.FieldLogger) error {
@@ -181,14 +181,14 @@ func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepositor
log.Info("Running maintenance on restic repository") log.Info("Running maintenance on restic repository")
log.Debug("Checking repo before prune") log.Debug("Checking repo before prune")
if err := c.repositoryManager.CheckRepo(req.Name); err != nil { if err := c.repositoryManager.CheckRepo(req.Name, req.Spec.ResticIdentifier); err != nil {
return c.patchResticRepository(req, repoNotReady(err.Error())) return c.patchResticRepository(req, repoNotReady(err.Error()))
} }
// prune failures should be displayed in the `.status.message` field but // prune failures should be displayed in the `.status.message` field but
// should not cause the repo to move to `NotReady`. // should not cause the repo to move to `NotReady`.
log.Debug("Pruning repo") log.Debug("Pruning repo")
if err := c.repositoryManager.PruneRepo(req.Name); err != nil { if err := c.repositoryManager.PruneRepo(req.Name, req.Spec.ResticIdentifier); err != nil {
log.WithError(err).Warn("error pruning repository") log.WithError(err).Warn("error pruning repository")
if patchErr := c.patchResticRepository(req, func(r *v1.ResticRepository) { if patchErr := c.patchResticRepository(req, func(r *v1.ResticRepository) {
r.Status.Message = err.Error() r.Status.Message = err.Error()
@@ -198,7 +198,7 @@ func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepositor
} }
log.Debug("Checking repo after prune") log.Debug("Checking repo after prune")
if err := c.repositoryManager.CheckRepo(req.Name); err != nil { if err := c.repositoryManager.CheckRepo(req.Name, req.Spec.ResticIdentifier); err != nil {
return c.patchResticRepository(req, repoNotReady(err.Error())) return c.patchResticRepository(req, repoNotReady(err.Error()))
} }
@@ -216,7 +216,7 @@ func (c *resticRepositoryController) checkNotReadyRepo(req *v1.ResticRepository,
// we need to ensure it (first check, if check fails, attempt to init) // we need to ensure it (first check, if check fails, attempt to init)
// because we don't know if it's been successfully initialized yet. // because we don't know if it's been successfully initialized yet.
if err := ensureRepo(req.Name, c.repositoryManager); err != nil { if err := ensureRepo(req.Name, req.Spec.ResticIdentifier, c.repositoryManager); err != nil {
return c.patchResticRepository(req, repoNotReady(err.Error())) return c.patchResticRepository(req, repoNotReady(err.Error()))
} }

View File

@@ -25,12 +25,10 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/util/boolptr" "github.com/heptio/ark/pkg/util/boolptr"
) )
@@ -43,7 +41,7 @@ type Backupper interface {
type backupper struct { type backupper struct {
ctx context.Context ctx context.Context
repoManager *repositoryManager repoManager *repositoryManager
repoLister arkv1listers.ResticRepositoryLister repoEnsurer *repositoryEnsurer
results map[string]chan *arkv1api.PodVolumeBackup results map[string]chan *arkv1api.PodVolumeBackup
resultsLock sync.Mutex resultsLock sync.Mutex
@@ -52,13 +50,14 @@ type backupper struct {
func newBackupper( func newBackupper(
ctx context.Context, ctx context.Context,
repoManager *repositoryManager, repoManager *repositoryManager,
repoEnsurer *repositoryEnsurer,
podVolumeBackupInformer cache.SharedIndexInformer, podVolumeBackupInformer cache.SharedIndexInformer,
repoLister arkv1listers.ResticRepositoryLister, log logrus.FieldLogger,
) *backupper { ) *backupper {
b := &backupper{ b := &backupper{
ctx: ctx, ctx: ctx,
repoManager: repoManager, repoManager: repoManager,
repoLister: repoLister, repoEnsurer: repoEnsurer,
results: make(map[string]chan *arkv1api.PodVolumeBackup), results: make(map[string]chan *arkv1api.PodVolumeBackup),
} }
@@ -70,8 +69,14 @@ func newBackupper(
if pvb.Status.Phase == arkv1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == arkv1api.PodVolumeBackupPhaseFailed { if pvb.Status.Phase == arkv1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == arkv1api.PodVolumeBackupPhaseFailed {
b.resultsLock.Lock() b.resultsLock.Lock()
b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)] <- pvb defer b.resultsLock.Unlock()
b.resultsLock.Unlock()
resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)]
if !ok {
log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name)
return
}
resChan <- pvb
} }
}, },
}, },
@@ -84,31 +89,6 @@ func resultsKey(ns, name string) string {
return fmt.Sprintf("%s/%s", ns, name) return fmt.Sprintf("%s/%s", ns, name)
} }
func getRepo(repoLister arkv1listers.ResticRepositoryLister, ns, name string) (*arkv1api.ResticRepository, error) {
repo, err := repoLister.ResticRepositories(ns).Get(name)
if apierrors.IsNotFound(err) {
return nil, errors.Wrapf(err, "restic repository not found")
}
if err != nil {
return nil, errors.Wrapf(err, "error getting restic repository")
}
return repo, nil
}
func getReadyRepo(repoLister arkv1listers.ResticRepositoryLister, ns, name string) (*arkv1api.ResticRepository, error) {
repo, err := getRepo(repoLister, ns, name)
if err != nil {
return nil, err
}
if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository not ready")
}
return repo, nil
}
func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod, log logrus.FieldLogger) (map[string]string, []error) { func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod, log logrus.FieldLogger) (map[string]string, []error) {
// get volumes to backup from pod's annotations // get volumes to backup from pod's annotations
volumesToBackup := GetVolumesToBackup(pod) volumesToBackup := GetVolumesToBackup(pod)
@@ -116,11 +96,16 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod
return nil, nil return nil, nil
} }
repo, err := getReadyRepo(b.repoLister, backup.Namespace, pod.Namespace) repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace)
if err != nil { if err != nil {
return nil, []error{err} return nil, []error{err}
} }
// get a single non-exclusive lock since we'll wait for all individual
// backups to be complete before releasing it.
b.repoManager.repoLocker.Lock(pod.Namespace)
defer b.repoManager.repoLocker.Unlock(pod.Namespace)
resultsChan := make(chan *arkv1api.PodVolumeBackup) resultsChan := make(chan *arkv1api.PodVolumeBackup)
b.resultsLock.Lock() b.resultsLock.Lock()
@@ -133,9 +118,6 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod
) )
for _, volumeName := range volumesToBackup { for _, volumeName := range volumesToBackup {
b.repoManager.repoLocker.Lock(pod.Namespace)
defer b.repoManager.repoLocker.Unlock(pod.Namespace)
volumeBackup := newPodVolumeBackup(backup, pod, volumeName, repo.Spec.ResticIdentifier) volumeBackup := newPodVolumeBackup(backup, pod, volumeName, repo.Spec.ResticIdentifier)
if err := errorOnly(b.repoManager.arkClient.ArkV1().PodVolumeBackups(volumeBackup.Namespace).Create(volumeBackup)); err != nil { if err := errorOnly(b.repoManager.arkClient.ArkV1().PodVolumeBackups(volumeBackup.Namespace).Create(volumeBackup)); err != nil {

View File

@@ -1,3 +1,19 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic package restic
import ( import (

View File

@@ -33,6 +33,7 @@ import (
) )
const ( const (
DaemonSet = "restic"
InitContainer = "restic-wait" InitContainer = "restic-wait"
DefaultMaintenanceFrequency = 24 * time.Hour DefaultMaintenanceFrequency = 24 * time.Hour
@@ -143,9 +144,14 @@ func GetSnapshotsInBackup(backup *arkv1api.Backup, podVolumeBackupLister arkv1li
// encryption key for the given repo and returns its path. The // encryption key for the given repo and returns its path. The
// caller should generally call os.Remove() to remove the file // caller should generally call os.Remove() to remove the file
// when done with it. // when done with it.
func TempCredentialsFile(secretLister corev1listers.SecretLister, repoName string) (string, error) { func TempCredentialsFile(secretLister corev1listers.SecretLister, arkNamespace, repoName string) (string, error) {
secretGetter := NewListerSecretGetter(secretLister) secretGetter := NewListerSecretGetter(secretLister)
repoKey, err := GetRepositoryKey(secretGetter, repoName)
// For now, all restic repos share the same key so we don't need the repoName to fetch it.
// When we move to full-backup encryption, we'll likely have a separate key per restic repo
// (all within the Ark server's namespace) so GetRepositoryKey will need to take the repo
// name as an argument as well.
repoKey, err := GetRepositoryKey(secretGetter, arkNamespace)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@@ -1,3 +1,19 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic package restic
import ( import (

View File

@@ -0,0 +1,122 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
arkv1informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
)
// repositoryEnsurer ensures that Ark restic repositories are created and ready.
type repositoryEnsurer struct {
repoLister arkv1listers.ResticRepositoryLister
repoClient arkv1client.ResticRepositoriesGetter
readyChansLock sync.Mutex
readyChans map[string]chan *arkv1api.ResticRepository
}
func newRepositoryEnsurer(repoInformer arkv1informers.ResticRepositoryInformer, repoClient arkv1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer {
r := &repositoryEnsurer{
repoLister: repoInformer.Lister(),
repoClient: repoClient,
readyChans: make(map[string]chan *arkv1api.ResticRepository),
}
repoInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, upd interface{}) {
oldObj := old.(*arkv1api.ResticRepository)
newObj := upd.(*arkv1api.ResticRepository)
if oldObj.Status.Phase != arkv1api.ResticRepositoryPhaseReady && newObj.Status.Phase == arkv1api.ResticRepositoryPhaseReady {
r.readyChansLock.Lock()
defer r.readyChansLock.Unlock()
readyChan, ok := r.readyChans[newObj.Name]
if !ok {
log.Errorf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
return
}
readyChan <- newObj
delete(r.readyChans, newObj.Name)
}
},
},
)
return r
}
func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, name string) (*arkv1api.ResticRepository, error) {
if repo, err := r.repoLister.ResticRepositories(namespace).Get(name); err != nil && !apierrors.IsNotFound(err) {
return nil, errors.WithStack(err)
} else if err == nil {
if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository is not ready")
}
return repo, nil
}
// if we're here, it means we got an IsNotFound error, meaning we need to create a new
// repo and wait for it to be ready
repo := &arkv1api.ResticRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: arkv1api.ResticRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: DefaultMaintenanceFrequency},
},
}
readyChan := r.getReadyChan(name)
defer close(readyChan)
if _, err := r.repoClient.ResticRepositories(namespace).Create(repo); err != nil {
return nil, errors.Wrapf(err, "unable to create restic repository resource")
}
select {
case <-ctx.Done():
return nil, errors.New("timed out waiting for restic repository to become ready")
case res := <-readyChan:
return res, nil
}
}
func (r *repositoryEnsurer) getReadyChan(name string) chan *arkv1api.ResticRepository {
r.readyChansLock.Lock()
defer r.readyChansLock.Unlock()
r.readyChans[name] = make(chan *arkv1api.ResticRepository)
return r.readyChans[name]
}

View File

@@ -1,8 +1,26 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restic package restic
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
corev1api "k8s.io/api/core/v1" corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1"
@@ -10,10 +28,22 @@ import (
const ( const (
CredentialsSecretName = "ark-restic-credentials" CredentialsSecretName = "ark-restic-credentials"
CredentialsKey = "ark-restic-credentials" CredentialsKey = "repository-password"
encryptionKey = "static-passw0rd"
) )
func NewRepositoryKey(secretClient corev1client.SecretsGetter, namespace string, data []byte) error { func EnsureCommonRepositoryKey(secretClient corev1client.SecretsGetter, namespace string) error {
_, err := secretClient.Secrets(namespace).Get(CredentialsSecretName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.WithStack(err)
}
if err == nil {
return nil
}
// if we got here, we got an IsNotFound error, so we need to create the key
secret := &corev1api.Secret{ secret := &corev1api.Secret{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: namespace, Namespace: namespace,
@@ -21,13 +51,12 @@ func NewRepositoryKey(secretClient corev1client.SecretsGetter, namespace string,
}, },
Type: corev1api.SecretTypeOpaque, Type: corev1api.SecretTypeOpaque,
Data: map[string][]byte{ Data: map[string][]byte{
CredentialsKey: data, CredentialsKey: []byte(encryptionKey),
}, },
} }
_, err := secretClient.Secrets(namespace).Create(secret) if _, err = secretClient.Secrets(namespace).Create(secret); err != nil {
if err != nil { return errors.Wrapf(err, "error creating %s secret", CredentialsSecretName)
return errors.WithStack(err)
} }
return nil return nil

View File

@@ -25,12 +25,12 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" clientset "github.com/heptio/ark/pkg/generated/clientset/versioned"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
arkv1informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" arkv1informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
arkexec "github.com/heptio/ark/pkg/util/exec" arkexec "github.com/heptio/ark/pkg/util/exec"
@@ -38,18 +38,18 @@ import (
// RepositoryManager executes commands against restic repositories. // RepositoryManager executes commands against restic repositories.
type RepositoryManager interface { type RepositoryManager interface {
// InitRepo initializes a repo with the specified name. // InitRepo initializes a repo with the specified name and identifier.
InitRepo(name string) error InitRepo(name, identifier string) error
// CheckRepo checks the specified repo for errors. // CheckRepo checks the specified repo for errors.
CheckRepo(name string) error CheckRepo(name, identifier string) error
// PruneRepo deletes unused data from a repo. // PruneRepo deletes unused data from a repo.
PruneRepo(name string) error PruneRepo(name, identifier string) error
// Forget removes a snapshot from the list of // Forget removes a snapshot from the list of
// available snapshots in a repo. // available snapshots in a repo.
Forget(snapshot SnapshotIdentifier) error Forget(context.Context, SnapshotIdentifier) error
BackupperFactory BackupperFactory
@@ -74,11 +74,11 @@ type repositoryManager struct {
namespace string namespace string
arkClient clientset.Interface arkClient clientset.Interface
secretsLister corev1listers.SecretLister secretsLister corev1listers.SecretLister
secretsClient corev1client.SecretsGetter
repoLister arkv1listers.ResticRepositoryLister repoLister arkv1listers.ResticRepositoryLister
repoInformerSynced cache.InformerSynced repoInformerSynced cache.InformerSynced
log logrus.FieldLogger log logrus.FieldLogger
repoLocker *repoLocker repoLocker *repoLocker
repoEnsurer *repositoryEnsurer
} }
// NewRepositoryManager constructs a RepositoryManager. // NewRepositoryManager constructs a RepositoryManager.
@@ -87,19 +87,20 @@ func NewRepositoryManager(
namespace string, namespace string,
arkClient clientset.Interface, arkClient clientset.Interface,
secretsInformer cache.SharedIndexInformer, secretsInformer cache.SharedIndexInformer,
secretsClient corev1client.SecretsGetter,
repoInformer arkv1informers.ResticRepositoryInformer, repoInformer arkv1informers.ResticRepositoryInformer,
repoClient arkv1client.ResticRepositoriesGetter,
log logrus.FieldLogger, log logrus.FieldLogger,
) (RepositoryManager, error) { ) (RepositoryManager, error) {
rm := &repositoryManager{ rm := &repositoryManager{
namespace: namespace, namespace: namespace,
arkClient: arkClient, arkClient: arkClient,
secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()), secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()),
secretsClient: secretsClient,
repoLister: repoInformer.Lister(), repoLister: repoInformer.Lister(),
repoInformerSynced: repoInformer.Informer().HasSynced, repoInformerSynced: repoInformer.Informer().HasSynced,
log: log, log: log,
repoLocker: newRepoLocker(),
repoLocker: newRepoLocker(),
repoEnsurer: newRepositoryEnsurer(repoInformer, repoClient, log),
} }
if !cache.WaitForCacheSync(ctx.Done(), secretsInformer.HasSynced) { if !cache.WaitForCacheSync(ctx.Done(), secretsInformer.HasSynced) {
@@ -120,7 +121,7 @@ func (rm *repositoryManager) NewBackupper(ctx context.Context, backup *arkv1api.
}, },
) )
b := newBackupper(ctx, rm, informer, rm.repoLister) b := newBackupper(ctx, rm, rm.repoEnsurer, informer, rm.log)
go informer.Run(ctx.Done()) go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rm.repoInformerSynced) { if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rm.repoInformerSynced) {
@@ -141,72 +142,63 @@ func (rm *repositoryManager) NewRestorer(ctx context.Context, restore *arkv1api.
}, },
) )
r := newRestorer(ctx, rm, informer, rm.repoLister) r := newRestorer(ctx, rm, rm.repoEnsurer, informer, rm.log)
go informer.Run(ctx.Done()) go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rm.repoInformerSynced) {
return nil, errors.New("timed out waiting for cache to sync") return nil, errors.New("timed out waiting for cache to sync")
} }
return r, nil return r, nil
} }
func (rm *repositoryManager) InitRepo(name string) error { func (rm *repositoryManager) InitRepo(name, identifier string) error {
repo, err := getRepo(rm.repoLister, rm.namespace, name) // restic init requires an exclusive lock
if err != nil {
return err
}
rm.repoLocker.LockExclusive(name) rm.repoLocker.LockExclusive(name)
defer rm.repoLocker.UnlockExclusive(name) defer rm.repoLocker.UnlockExclusive(name)
return rm.exec(InitCommand(repo.Spec.ResticIdentifier)) return rm.exec(InitCommand(identifier))
} }
func (rm *repositoryManager) CheckRepo(name string) error { func (rm *repositoryManager) CheckRepo(name, identifier string) error {
repo, err := getRepo(rm.repoLister, rm.namespace, name) // restic check requires an exclusive lock
if err != nil {
return err
}
rm.repoLocker.LockExclusive(name) rm.repoLocker.LockExclusive(name)
defer rm.repoLocker.UnlockExclusive(name) defer rm.repoLocker.UnlockExclusive(name)
cmd := CheckCommand(repo.Spec.ResticIdentifier) return rm.exec(CheckCommand(identifier))
return rm.exec(cmd)
} }
func (rm *repositoryManager) PruneRepo(name string) error { func (rm *repositoryManager) PruneRepo(name, identifier string) error {
repo, err := getReadyRepo(rm.repoLister, rm.namespace, name) // restic prune requires an exclusive lock
if err != nil {
return err
}
rm.repoLocker.LockExclusive(name) rm.repoLocker.LockExclusive(name)
defer rm.repoLocker.UnlockExclusive(name) defer rm.repoLocker.UnlockExclusive(name)
cmd := PruneCommand(repo.Spec.ResticIdentifier) return rm.exec(PruneCommand(identifier))
return rm.exec(cmd)
} }
func (rm *repositoryManager) Forget(snapshot SnapshotIdentifier) error { func (rm *repositoryManager) Forget(ctx context.Context, snapshot SnapshotIdentifier) error {
repo, err := getReadyRepo(rm.repoLister, rm.namespace, snapshot.Repo) // We can't wait for this in the constructor, because this informer is coming
// from the shared informer factory, which isn't started until *after* the repo
// manager is instantiated & passed to the controller constructors. We'd get a
// deadlock if we tried to wait for this in the constructor.
if !cache.WaitForCacheSync(ctx.Done(), rm.repoInformerSynced) {
return errors.New("timed out waiting for cache to sync")
}
repo, err := rm.repoEnsurer.EnsureRepo(ctx, rm.namespace, snapshot.Repo)
if err != nil { if err != nil {
return err return err
} }
rm.repoLocker.LockExclusive(snapshot.Repo) // restic forget requires an exclusive lock
defer rm.repoLocker.UnlockExclusive(snapshot.Repo) rm.repoLocker.LockExclusive(repo.Name)
defer rm.repoLocker.UnlockExclusive(repo.Name)
cmd := ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID) return rm.exec(ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID))
return rm.exec(cmd)
} }
func (rm *repositoryManager) exec(cmd *Command) error { func (rm *repositoryManager) exec(cmd *Command) error {
file, err := TempCredentialsFile(rm.secretsLister, cmd.RepoName()) file, err := TempCredentialsFile(rm.secretsLister, rm.namespace, cmd.RepoName())
if err != nil { if err != nil {
return err return err
} }

View File

@@ -28,7 +28,6 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/util/boolptr" "github.com/heptio/ark/pkg/util/boolptr"
) )
@@ -41,7 +40,7 @@ type Restorer interface {
type restorer struct { type restorer struct {
ctx context.Context ctx context.Context
repoManager *repositoryManager repoManager *repositoryManager
repoLister arkv1listers.ResticRepositoryLister repoEnsurer *repositoryEnsurer
resultsLock sync.Mutex resultsLock sync.Mutex
results map[string]chan *arkv1api.PodVolumeRestore results map[string]chan *arkv1api.PodVolumeRestore
@@ -50,13 +49,14 @@ type restorer struct {
func newRestorer( func newRestorer(
ctx context.Context, ctx context.Context,
rm *repositoryManager, rm *repositoryManager,
repoEnsurer *repositoryEnsurer,
podVolumeRestoreInformer cache.SharedIndexInformer, podVolumeRestoreInformer cache.SharedIndexInformer,
repoLister arkv1listers.ResticRepositoryLister, log logrus.FieldLogger,
) *restorer { ) *restorer {
r := &restorer{ r := &restorer{
ctx: ctx, ctx: ctx,
repoManager: rm, repoManager: rm,
repoLister: repoLister, repoEnsurer: repoEnsurer,
results: make(map[string]chan *arkv1api.PodVolumeRestore), results: make(map[string]chan *arkv1api.PodVolumeRestore),
} }
@@ -68,8 +68,14 @@ func newRestorer(
if pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseFailed { if pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseCompleted || pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseFailed {
r.resultsLock.Lock() r.resultsLock.Lock()
r.results[resultsKey(pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)] <- pvr defer r.resultsLock.Unlock()
r.resultsLock.Unlock()
resChan, ok := r.results[resultsKey(pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)]
if !ok {
log.Errorf("No results channel found for pod %s/%s to send pod volume restore %s/%s on", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name, pvr.Namespace, pvr.Name)
return
}
resChan <- pvr
} }
}, },
}, },
@@ -85,11 +91,16 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P
return nil return nil
} }
repo, err := getReadyRepo(r.repoLister, restore.Namespace, pod.Namespace) repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, pod.Namespace)
if err != nil { if err != nil {
return []error{err} return []error{err}
} }
// get a single non-exclusive lock since we'll wait for all individual
// restores to be complete before releasing it.
r.repoManager.repoLocker.Lock(pod.Namespace)
defer r.repoManager.repoLocker.Unlock(pod.Namespace)
resultsChan := make(chan *arkv1api.PodVolumeRestore) resultsChan := make(chan *arkv1api.PodVolumeRestore)
r.resultsLock.Lock() r.resultsLock.Lock()
@@ -102,9 +113,6 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P
) )
for volume, snapshot := range volumesToRestore { for volume, snapshot := range volumesToRestore {
r.repoManager.repoLocker.Lock(pod.Namespace)
defer r.repoManager.repoLocker.Unlock(pod.Namespace)
volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, repo.Spec.ResticIdentifier) volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, repo.Spec.ResticIdentifier)
if err := errorOnly(r.repoManager.arkClient.ArkV1().PodVolumeRestores(volumeRestore.Namespace).Create(volumeRestore)); err != nil { if err := errorOnly(r.repoManager.arkClient.ArkV1().PodVolumeRestores(volumeRestore.Namespace).Create(volumeRestore)); err != nil {