From 65ed8da4b70ae8aea4da32e95adda7bdb8d0bb4b Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Wed, 6 Jun 2018 14:32:28 -0700 Subject: [PATCH] add ResticRepository CRD and move repo-level actions to a controller Signed-off-by: Steve Kriss --- docs/cli-reference/ark_restic.md | 2 +- .../ark_restic_init-repository.md | 6 +- examples/common/00-prereqs.yaml | 15 + pkg/apis/ark/v1/register.go | 2 + pkg/apis/ark/v1/restic_repository.go | 72 +++++ pkg/apis/ark/v1/zz_generated.deepcopy.go | 95 ++++++ pkg/cmd/cli/restic/init_repository.go | 46 ++- pkg/cmd/cli/restic/init_repository_test.go | 10 +- pkg/cmd/server/server.go | 37 +-- pkg/controller/generic_controller.go | 4 + .../restic_repository_controller.go | 276 ++++++++++++++++++ .../versioned/typed/ark/v1/ark_client.go | 5 + .../typed/ark/v1/fake/fake_ark_client.go | 4 + .../ark/v1/fake/fake_resticrepository.go | 140 +++++++++ .../typed/ark/v1/generated_expansion.go | 2 + .../typed/ark/v1/resticrepository.go | 174 +++++++++++ .../externalversions/ark/v1/interface.go | 7 + .../ark/v1/resticrepository.go | 89 ++++++ .../informers/externalversions/generic.go | 2 + .../listers/ark/v1/expansion_generated.go | 8 + .../listers/ark/v1/resticrepository.go | 94 ++++++ pkg/install/config.go | 10 +- pkg/install/crd.go | 1 + pkg/restic/backupper.go | 32 +- pkg/restic/common.go | 4 +- pkg/restic/config.go | 71 +++++ pkg/restic/repository_manager.go | 188 ++---------- pkg/restic/restorer.go | 2 +- 28 files changed, 1190 insertions(+), 208 deletions(-) create mode 100644 pkg/apis/ark/v1/restic_repository.go create mode 100644 pkg/controller/restic_repository_controller.go create mode 100644 pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_resticrepository.go create mode 100644 pkg/generated/clientset/versioned/typed/ark/v1/resticrepository.go create mode 100644 pkg/generated/informers/externalversions/ark/v1/resticrepository.go create mode 100644 pkg/generated/listers/ark/v1/resticrepository.go create mode 100644 pkg/restic/config.go diff --git a/docs/cli-reference/ark_restic.md b/docs/cli-reference/ark_restic.md index 16bbad33c..b4ee4dd71 100644 --- a/docs/cli-reference/ark_restic.md +++ b/docs/cli-reference/ark_restic.md @@ -30,6 +30,6 @@ Work with restic repositories ### SEE ALSO * [ark](ark.md) - Back up and restore Kubernetes cluster resources. -* [ark restic init-repository](ark_restic_init-repository.md) - create an encryption key for a restic repository +* [ark restic init-repository](ark_restic_init-repository.md) - initialize a restic repository for a specified namespace * [ark restic server](ark_restic_server.md) - Run the ark restic server diff --git a/docs/cli-reference/ark_restic_init-repository.md b/docs/cli-reference/ark_restic_init-repository.md index 2704ea0e2..bab4ebe47 100644 --- a/docs/cli-reference/ark_restic_init-repository.md +++ b/docs/cli-reference/ark_restic_init-repository.md @@ -1,14 +1,14 @@ ## ark restic init-repository -create an encryption key for a restic repository +initialize a restic repository for a specified namespace ### Synopsis -create an encryption key for a restic repository +initialize a restic repository for a specified namespace ``` -ark restic init-repository [flags] +ark restic init-repository NAMESPACE [flags] ``` ### Options diff --git a/examples/common/00-prereqs.yaml b/examples/common/00-prereqs.yaml index c7b5c4b45..53660427e 100644 --- a/examples/common/00-prereqs.yaml +++ b/examples/common/00-prereqs.yaml @@ -132,6 +132,21 @@ spec: plural: podvolumerestores kind: PodVolumeRestore +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: resticrepositories.ark.heptio.com + labels: + component: ark +spec: + group: ark.heptio.com + version: v1 + scope: Namespaced + names: + plural: resticrepositories + kind: ResticRepository + --- apiVersion: v1 kind: Namespace diff --git a/pkg/apis/ark/v1/register.go b/pkg/apis/ark/v1/register.go index 5bf88b63e..db92547c9 100644 --- a/pkg/apis/ark/v1/register.go +++ b/pkg/apis/ark/v1/register.go @@ -59,6 +59,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &PodVolumeBackupList{}, &PodVolumeRestore{}, &PodVolumeRestoreList{}, + &ResticRepository{}, + &ResticRepositoryList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/ark/v1/restic_repository.go b/pkg/apis/ark/v1/restic_repository.go new file mode 100644 index 000000000..236cf5143 --- /dev/null +++ b/pkg/apis/ark/v1/restic_repository.go @@ -0,0 +1,72 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ResticRepositorySpec is the specification for a ResticRepository. +type ResticRepositorySpec struct { + // MaintenanceFrequency is how often maintenance should be run. + MaintenanceFrequency metav1.Duration `json:"maintenanceFrequency"` + + // ResticIdentifier is the full restic-compatible string for identifying + // this repository. + ResticIdentifier string `json:"resticIdentifier"` +} + +// ResticRepositoryPhase represents the lifecycle phase of a ResticRepository. +type ResticRepositoryPhase string + +const ( + ResticRepositoryPhaseNew ResticRepositoryPhase = "New" + ResticRepositoryPhaseReady ResticRepositoryPhase = "Ready" + ResticRepositoryPhaseNotReady ResticRepositoryPhase = "NotReady" +) + +// ResticRepositoryStatus is the current status of a ResticRepository. +type ResticRepositoryStatus struct { + // Phase is the current state of the ResticRepository. + Phase ResticRepositoryPhase `json:"phase"` + + // Message is a message about the current status of the ResticRepository. + Message string `json:"message"` + + // LastMaintenanceTime is the last time maintenance was run. + LastMaintenanceTime metav1.Time `json:"lastMaintenanceTime"` +} + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type ResticRepository struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + + Spec ResticRepositorySpec `json:"spec"` + Status ResticRepositoryStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ResticRepositoryList is a list of ResticRepositories. +type ResticRepositoryList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []ResticRepository `json:"items"` +} diff --git a/pkg/apis/ark/v1/zz_generated.deepcopy.go b/pkg/apis/ark/v1/zz_generated.deepcopy.go index 062b2bcc0..0a364302f 100644 --- a/pkg/apis/ark/v1/zz_generated.deepcopy.go +++ b/pkg/apis/ark/v1/zz_generated.deepcopy.go @@ -843,6 +843,101 @@ func (in *PodVolumeRestoreStatus) DeepCopy() *PodVolumeRestoreStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResticRepository) DeepCopyInto(out *ResticRepository) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResticRepository. +func (in *ResticRepository) DeepCopy() *ResticRepository { + if in == nil { + return nil + } + out := new(ResticRepository) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ResticRepository) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResticRepositoryList) DeepCopyInto(out *ResticRepositoryList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ResticRepository, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResticRepositoryList. +func (in *ResticRepositoryList) DeepCopy() *ResticRepositoryList { + if in == nil { + return nil + } + out := new(ResticRepositoryList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ResticRepositoryList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResticRepositorySpec) DeepCopyInto(out *ResticRepositorySpec) { + *out = *in + out.MaintenanceFrequency = in.MaintenanceFrequency + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResticRepositorySpec. +func (in *ResticRepositorySpec) DeepCopy() *ResticRepositorySpec { + if in == nil { + return nil + } + out := new(ResticRepositorySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResticRepositoryStatus) DeepCopyInto(out *ResticRepositoryStatus) { + *out = *in + in.LastMaintenanceTime.DeepCopyInto(&out.LastMaintenanceTime) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResticRepositoryStatus. +func (in *ResticRepositoryStatus) DeepCopy() *ResticRepositoryStatus { + if in == nil { + return nil + } + out := new(ResticRepositoryStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Restore) DeepCopyInto(out *Restore) { *out = *in diff --git a/pkg/cmd/cli/restic/init_repository.go b/pkg/cmd/cli/restic/init_repository.go index 18fd77f82..4f8dd3c87 100644 --- a/pkg/cmd/cli/restic/init_repository.go +++ b/pkg/cmd/cli/restic/init_repository.go @@ -19,26 +19,31 @@ package restic import ( "crypto/rand" - "github.com/heptio/ark/pkg/client" - "github.com/heptio/ark/pkg/cmd" - "github.com/heptio/ark/pkg/restic" - "github.com/heptio/ark/pkg/util/filesystem" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kclientset "k8s.io/client-go/kubernetes" + + "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cmd" + clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" + "github.com/heptio/ark/pkg/restic" + "github.com/heptio/ark/pkg/util/filesystem" ) func NewInitRepositoryCommand(f client.Factory) *cobra.Command { o := NewInitRepositoryOptions() c := &cobra.Command{ - Use: "init-repository", - Short: "create an encryption key for a restic repository", - Long: "create an encryption key for a restic repository", + Use: "init-repository NAMESPACE", + Short: "initialize a restic repository for a specified namespace", + Long: "initialize a restic repository for a specified namespace", + Args: cobra.ExactArgs(1), Run: func(c *cobra.Command, args []string) { - cmd.CheckError(o.Complete(f)) + cmd.CheckError(o.Complete(f, args)) cmd.CheckError(o.Validate(f)) cmd.CheckError(o.Run(f)) }, @@ -57,6 +62,7 @@ type InitRepositoryOptions struct { fileSystem filesystem.Interface kubeClient kclientset.Interface + arkClient clientset.Interface keyBytes []byte } @@ -78,7 +84,7 @@ func (o *InitRepositoryOptions) BindFlags(flags *pflag.FlagSet) { flags.IntVar(&o.KeySize, "key-size", o.KeySize, "Size of the generated key for the restic repository") } -func (o *InitRepositoryOptions) Complete(f client.Factory) error { +func (o *InitRepositoryOptions) Complete(f client.Factory, args []string) error { if o.KeyFile != "" && o.KeyData != "" { return errKeyFileAndKeyDataProvided } @@ -87,7 +93,7 @@ func (o *InitRepositoryOptions) Complete(f client.Factory) error { return errKeySizeTooSmall } - o.Namespace = f.Namespace() + o.Namespace = args[0] switch { case o.KeyFile != "": @@ -122,9 +128,27 @@ func (o *InitRepositoryOptions) Validate(f client.Factory) error { return err } + arkClient, err := f.Client() + if err != nil { + return err + } + o.arkClient = arkClient + return nil } func (o *InitRepositoryOptions) Run(f client.Factory) error { - return restic.NewRepositoryKey(o.kubeClient.CoreV1(), o.Namespace, o.keyBytes) + if err := restic.NewRepositoryKey(o.kubeClient.CoreV1(), o.Namespace, o.keyBytes); err != nil { + return err + } + + repo := &v1.ResticRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: f.Namespace(), + Name: o.Namespace, + }, + } + + _, err := o.arkClient.ArkV1().ResticRepositories(f.Namespace()).Create(repo) + return errors.Wrap(err, "error creating ResticRepository") } diff --git a/pkg/cmd/cli/restic/init_repository_test.go b/pkg/cmd/cli/restic/init_repository_test.go index b8d3929e9..6a7b4584e 100644 --- a/pkg/cmd/cli/restic/init_repository_test.go +++ b/pkg/cmd/cli/restic/init_repository_test.go @@ -36,7 +36,7 @@ func (f *fakeFactory) Namespace() string { func TestComplete(t *testing.T) { // no key options provided should error o := &InitRepositoryOptions{} - err := o.Complete(&fakeFactory{}) + err := o.Complete(&fakeFactory{}, []string{"ns"}) assert.EqualError(t, err, errKeySizeTooSmall.Error()) // both KeyFile and KeyData provided should error @@ -44,7 +44,7 @@ func TestComplete(t *testing.T) { KeyFile: "/foo", KeyData: "bar", } - err = o.Complete(&fakeFactory{}) + err = o.Complete(&fakeFactory{}, []string{"ns"}) assert.EqualError(t, err, errKeyFileAndKeyDataProvided.Error()) // if KeyFile is provided, its contents are used @@ -53,20 +53,20 @@ func TestComplete(t *testing.T) { KeyFile: "/foo", fileSystem: arktest.NewFakeFileSystem().WithFile("/foo", fileContents), } - assert.NoError(t, o.Complete(&fakeFactory{})) + assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"})) assert.Equal(t, fileContents, o.keyBytes) // if KeyData is provided, it's used o = &InitRepositoryOptions{ KeyData: "bar", } - assert.NoError(t, o.Complete(&fakeFactory{})) + assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"})) assert.Equal(t, []byte(o.KeyData), o.keyBytes) // if KeySize is provided, a random key is generated o = &InitRepositoryOptions{ KeySize: 10, } - assert.NoError(t, o.Complete(&fakeFactory{})) + assert.NoError(t, o.Complete(&fakeFactory{}, []string{"ns"})) assert.Equal(t, o.KeySize, len(o.keyBytes)) } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 4caa57008..e452c8201 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -235,7 +235,6 @@ func (s *server) run() error { if err := s.initRestic(config.BackupStorageProvider); err != nil { return err } - s.runResticMaintenance() // warn if restic daemonset does not exist _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get("restic", metav1.GetOptions{}) @@ -253,20 +252,6 @@ func (s *server) run() error { return nil } -func (s *server) runResticMaintenance() { - go func() { - interval := time.Hour - - <-time.After(interval) - - wait.Forever(func() { - if err := s.resticManager.PruneAllRepos(); err != nil { - s.logger.WithError(err).Error("error pruning repos") - } - }, interval) - }() -} - func (s *server) ensureArkNamespace() error { logContext := s.logger.WithField("namespace", s.namespace) @@ -491,11 +476,11 @@ func (s *server) initRestic(config api.ObjectStorageProviderConfig) error { res, err := restic.NewRepositoryManager( s.ctx, - s.objectStore, config, s.arkClient, secretsInformer, s.kubeClient.CoreV1(), + s.sharedInformerFactory.Ark().V1().ResticRepositories(), s.logger, ) if err != nil { @@ -503,8 +488,7 @@ func (s *server) initRestic(config api.ObjectStorageProviderConfig) error { } s.resticManager = res - s.logger.Info("Checking restic repositories") - return s.resticManager.CheckAllRepos() + return nil } func (s *server) runControllers(config *api.Config) error { @@ -689,6 +673,23 @@ func (s *server) runControllers(config *api.Config) error { wg.Done() }() + if s.resticManager != nil { + resticRepoController := controller.NewResticRepositoryController( + s.logger, + s.sharedInformerFactory.Ark().V1().ResticRepositories(), + s.arkClient.ArkV1(), + config.BackupStorageProvider, + s.resticManager, + ) + wg.Add(1) + go func() { + // TODO only having a single worker may be an issue since maintenance + // can take a long time. + resticRepoController.Run(ctx, 1) + wg.Done() + }() + } + // SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS go s.sharedInformerFactory.Start(ctx.Done()) diff --git a/pkg/controller/generic_controller.go b/pkg/controller/generic_controller.go index c83dd55f0..e18b3e243 100644 --- a/pkg/controller/generic_controller.go +++ b/pkg/controller/generic_controller.go @@ -151,3 +151,7 @@ func (c *genericController) enqueue(obj interface{}) { c.queue.Add(key) } + +func (c *genericController) enqueueSecond(_, obj interface{}) { + c.enqueue(obj) +} diff --git a/pkg/controller/restic_repository_controller.go b/pkg/controller/restic_repository_controller.go new file mode 100644 index 000000000..16378e60f --- /dev/null +++ b/pkg/controller/restic_repository_controller.go @@ -0,0 +1,276 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "encoding/json" + "time" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/tools/cache" + + "github.com/heptio/ark/pkg/apis/ark/v1" + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" + arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" + listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/restic" +) + +type resticRepositoryController struct { + *genericController + + resticRepositoryClient arkv1client.ResticRepositoriesGetter + resticRepositoryLister listers.ResticRepositoryLister + objectStorageConfig arkv1api.ObjectStorageProviderConfig + repositoryManager restic.RepositoryManager + + clock clock.Clock +} + +// NewResticRepositoryController creates a new restic repository controller. +func NewResticRepositoryController( + logger logrus.FieldLogger, + resticRepositoryInformer informers.ResticRepositoryInformer, + resticRepositoryClient arkv1client.ResticRepositoriesGetter, + objectStorageConfig arkv1api.ObjectStorageProviderConfig, + repositoryManager restic.RepositoryManager, +) Interface { + c := &resticRepositoryController{ + genericController: newGenericController("restic-repository", logger), + resticRepositoryClient: resticRepositoryClient, + resticRepositoryLister: resticRepositoryInformer.Lister(), + objectStorageConfig: objectStorageConfig, + repositoryManager: repositoryManager, + clock: &clock.RealClock{}, + } + + c.syncHandler = c.processQueueItem + c.cacheSyncWaiters = append(c.cacheSyncWaiters, resticRepositoryInformer.Informer().HasSynced) + + resticRepositoryInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueue, + }, + ) + + c.resyncPeriod = 30 * time.Minute + c.resyncFunc = c.enqueueAllRepositories + + return c +} + +// enqueueAllRepositories lists all restic repositories from cache and enqueues all +// of them so we can check each one for maintenance. +func (c *resticRepositoryController) enqueueAllRepositories() { + c.logger.Debug("resticRepositoryController.enqueueAllRepositories") + + repos, err := c.resticRepositoryLister.List(labels.Everything()) + if err != nil { + c.logger.WithError(errors.WithStack(err)).Error("error listing restic repositories") + return + } + + for _, repo := range repos { + c.enqueue(repo) + } +} + +func (c *resticRepositoryController) processQueueItem(key string) error { + log := c.logger.WithField("key", key) + log.Debug("Running processQueueItem") + + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + log.WithError(errors.WithStack(err)).Error("error splitting queue key") + return nil + } + + log = c.logger.WithField("namespace", ns).WithField("name", name) + + req, err := c.resticRepositoryLister.ResticRepositories(ns).Get(name) + if apierrors.IsNotFound(err) { + log.Debug("Unable to find ResticRepository") + return nil + } + if err != nil { + return errors.Wrap(err, "error getting ResticRepository") + } + + // Don't mutate the shared cache + reqCopy := req.DeepCopy() + + switch req.Status.Phase { + case "", v1.ResticRepositoryPhaseNew: + return c.initializeRepo(reqCopy, log) + case v1.ResticRepositoryPhaseReady: + return c.runMaintenanceIfDue(reqCopy, log) + case v1.ResticRepositoryPhaseNotReady: + return c.checkNotReadyRepo(reqCopy, log) + } + + return nil +} + +func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, log logrus.FieldLogger) error { + log.Info("Initializing restic repository") + + // defaulting - if the patch fails, return an error so the item is returned to the queue + if err := c.patchResticRepository(req, func(r *v1.ResticRepository) { + r.Spec.ResticIdentifier = restic.GetRepoIdentifier(c.objectStorageConfig, r.Name) + + if r.Spec.MaintenanceFrequency.Duration <= 0 { + r.Spec.MaintenanceFrequency = metav1.Duration{Duration: restic.DefaultMaintenanceFrequency} + } + }); err != nil { + return err + } + + if err := ensureRepo(req.Name, c.repositoryManager); err != nil { + return c.patchResticRepository(req, repoNotReady(err.Error())) + } + + return c.patchResticRepository(req, func(req *v1.ResticRepository) { + req.Status.Phase = v1.ResticRepositoryPhaseReady + req.Status.LastMaintenanceTime = metav1.Time{Time: time.Now()} + }) +} + +// ensureRepo first checks the repo, and returns if check passes. If it fails, +// attempts to init the repo, and returns the result. +func ensureRepo(name string, repoManager restic.RepositoryManager) error { + if repoManager.CheckRepo(name) == nil { + return nil + } + + return repoManager.InitRepo(name) +} + +func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepository, log logrus.FieldLogger) error { + log.Debug("resticRepositoryController.runMaintenanceIfDue") + + now := c.clock.Now() + + if !dueForMaintenance(req, now) { + log.Debug("not due for maintenance") + return nil + } + + log.Info("Running maintenance on restic repository") + + log.Debug("Checking repo before prune") + if err := c.repositoryManager.CheckRepo(req.Name); err != nil { + return c.patchResticRepository(req, repoNotReady(err.Error())) + } + + // prune failures should be displayed in the `.status.message` field but + // should not cause the repo to move to `NotReady`. + log.Debug("Pruning repo") + if err := c.repositoryManager.PruneRepo(req.Name); err != nil { + log.WithError(err).Warn("error pruning repository") + if patchErr := c.patchResticRepository(req, func(r *v1.ResticRepository) { + r.Status.Message = err.Error() + }); patchErr != nil { + return patchErr + } + } + + log.Debug("Checking repo after prune") + if err := c.repositoryManager.CheckRepo(req.Name); err != nil { + return c.patchResticRepository(req, repoNotReady(err.Error())) + } + + return c.patchResticRepository(req, func(req *v1.ResticRepository) { + req.Status.LastMaintenanceTime = metav1.Time{Time: now} + }) +} + +func dueForMaintenance(req *v1.ResticRepository, now time.Time) bool { + return req.Status.LastMaintenanceTime.Add(req.Spec.MaintenanceFrequency.Duration).Before(now) +} + +func (c *resticRepositoryController) checkNotReadyRepo(req *v1.ResticRepository, log logrus.FieldLogger) error { + log.Info("Checking restic repository for readiness") + + // we need to ensure it (first check, if check fails, attempt to init) + // because we don't know if it's been successfully initialized yet. + if err := ensureRepo(req.Name, c.repositoryManager); err != nil { + return c.patchResticRepository(req, repoNotReady(err.Error())) + } + + return c.patchResticRepository(req, repoReady()) +} + +func repoNotReady(msg string) func(*v1.ResticRepository) { + return func(r *v1.ResticRepository) { + r.Status.Phase = v1.ResticRepositoryPhaseNotReady + r.Status.Message = msg + } +} + +func repoReady() func(*v1.ResticRepository) { + return func(r *v1.ResticRepository) { + r.Status.Phase = v1.ResticRepositoryPhaseReady + r.Status.Message = "" + } +} + +// patchResticRepository mutates req with the provided mutate function, and patches it +// through the Kube API. After executing this function, req will be updated with both +// the mutation and the results of the Patch() API call. +func (c *resticRepositoryController) patchResticRepository(req *v1.ResticRepository, mutate func(*v1.ResticRepository)) error { + // Record original json + oldData, err := json.Marshal(req) + if err != nil { + return errors.Wrap(err, "error marshalling original ResticRepository") + } + + mutate(req) + + // Record new json + newData, err := json.Marshal(req) + if err != nil { + return errors.Wrap(err, "error marshalling updated ResticRepository") + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return errors.Wrap(err, "error creating json merge patch for ResticRepository") + } + + // empty patch: don't apply + if string(patchBytes) == "{}" { + return nil + } + + // patch, and if successful, update req + var patched *v1.ResticRepository + if patched, err = c.resticRepositoryClient.ResticRepositories(req.Namespace).Patch(req.Name, types.MergePatchType, patchBytes); err != nil { + return errors.Wrap(err, "error patching ResticRepository") + } + req = patched + + return nil +} diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go b/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go index 3caa51a9b..184634c47 100644 --- a/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go +++ b/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go @@ -33,6 +33,7 @@ type ArkV1Interface interface { DownloadRequestsGetter PodVolumeBackupsGetter PodVolumeRestoresGetter + ResticRepositoriesGetter RestoresGetter SchedulesGetter } @@ -66,6 +67,10 @@ func (c *ArkV1Client) PodVolumeRestores(namespace string) PodVolumeRestoreInterf return newPodVolumeRestores(c, namespace) } +func (c *ArkV1Client) ResticRepositories(namespace string) ResticRepositoryInterface { + return newResticRepositories(c, namespace) +} + func (c *ArkV1Client) Restores(namespace string) RestoreInterface { return newRestores(c, namespace) } diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go index a06504732..25f8f0a2e 100644 --- a/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go +++ b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go @@ -52,6 +52,10 @@ func (c *FakeArkV1) PodVolumeRestores(namespace string) v1.PodVolumeRestoreInter return &FakePodVolumeRestores{c, namespace} } +func (c *FakeArkV1) ResticRepositories(namespace string) v1.ResticRepositoryInterface { + return &FakeResticRepositories{c, namespace} +} + func (c *FakeArkV1) Restores(namespace string) v1.RestoreInterface { return &FakeRestores{c, namespace} } diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_resticrepository.go b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_resticrepository.go new file mode 100644 index 000000000..ae2b5ce55 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_resticrepository.go @@ -0,0 +1,140 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + ark_v1 "github.com/heptio/ark/pkg/apis/ark/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeResticRepositories implements ResticRepositoryInterface +type FakeResticRepositories struct { + Fake *FakeArkV1 + ns string +} + +var resticrepositoriesResource = schema.GroupVersionResource{Group: "ark.heptio.com", Version: "v1", Resource: "resticrepositories"} + +var resticrepositoriesKind = schema.GroupVersionKind{Group: "ark.heptio.com", Version: "v1", Kind: "ResticRepository"} + +// Get takes name of the resticRepository, and returns the corresponding resticRepository object, and an error if there is any. +func (c *FakeResticRepositories) Get(name string, options v1.GetOptions) (result *ark_v1.ResticRepository, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(resticrepositoriesResource, c.ns, name), &ark_v1.ResticRepository{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.ResticRepository), err +} + +// List takes label and field selectors, and returns the list of ResticRepositories that match those selectors. +func (c *FakeResticRepositories) List(opts v1.ListOptions) (result *ark_v1.ResticRepositoryList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(resticrepositoriesResource, resticrepositoriesKind, c.ns, opts), &ark_v1.ResticRepositoryList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &ark_v1.ResticRepositoryList{} + for _, item := range obj.(*ark_v1.ResticRepositoryList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested resticRepositories. +func (c *FakeResticRepositories) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(resticrepositoriesResource, c.ns, opts)) + +} + +// Create takes the representation of a resticRepository and creates it. Returns the server's representation of the resticRepository, and an error, if there is any. +func (c *FakeResticRepositories) Create(resticRepository *ark_v1.ResticRepository) (result *ark_v1.ResticRepository, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(resticrepositoriesResource, c.ns, resticRepository), &ark_v1.ResticRepository{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.ResticRepository), err +} + +// Update takes the representation of a resticRepository and updates it. Returns the server's representation of the resticRepository, and an error, if there is any. +func (c *FakeResticRepositories) Update(resticRepository *ark_v1.ResticRepository) (result *ark_v1.ResticRepository, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(resticrepositoriesResource, c.ns, resticRepository), &ark_v1.ResticRepository{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.ResticRepository), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeResticRepositories) UpdateStatus(resticRepository *ark_v1.ResticRepository) (*ark_v1.ResticRepository, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(resticrepositoriesResource, "status", c.ns, resticRepository), &ark_v1.ResticRepository{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.ResticRepository), err +} + +// Delete takes name of the resticRepository and deletes it. Returns an error if one occurs. +func (c *FakeResticRepositories) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(resticrepositoriesResource, c.ns, name), &ark_v1.ResticRepository{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeResticRepositories) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(resticrepositoriesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &ark_v1.ResticRepositoryList{}) + return err +} + +// Patch applies the patch and returns the patched resticRepository. +func (c *FakeResticRepositories) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *ark_v1.ResticRepository, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(resticrepositoriesResource, c.ns, name, data, subresources...), &ark_v1.ResticRepository{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.ResticRepository), err +} diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go index ee1cb0bdd..e09577a39 100644 --- a/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go +++ b/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go @@ -30,6 +30,8 @@ type PodVolumeBackupExpansion interface{} type PodVolumeRestoreExpansion interface{} +type ResticRepositoryExpansion interface{} + type RestoreExpansion interface{} type ScheduleExpansion interface{} diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/resticrepository.go b/pkg/generated/clientset/versioned/typed/ark/v1/resticrepository.go new file mode 100644 index 000000000..1c1512095 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/ark/v1/resticrepository.go @@ -0,0 +1,174 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/heptio/ark/pkg/apis/ark/v1" + scheme "github.com/heptio/ark/pkg/generated/clientset/versioned/scheme" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ResticRepositoriesGetter has a method to return a ResticRepositoryInterface. +// A group's client should implement this interface. +type ResticRepositoriesGetter interface { + ResticRepositories(namespace string) ResticRepositoryInterface +} + +// ResticRepositoryInterface has methods to work with ResticRepository resources. +type ResticRepositoryInterface interface { + Create(*v1.ResticRepository) (*v1.ResticRepository, error) + Update(*v1.ResticRepository) (*v1.ResticRepository, error) + UpdateStatus(*v1.ResticRepository) (*v1.ResticRepository, error) + Delete(name string, options *meta_v1.DeleteOptions) error + DeleteCollection(options *meta_v1.DeleteOptions, listOptions meta_v1.ListOptions) error + Get(name string, options meta_v1.GetOptions) (*v1.ResticRepository, error) + List(opts meta_v1.ListOptions) (*v1.ResticRepositoryList, error) + Watch(opts meta_v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.ResticRepository, err error) + ResticRepositoryExpansion +} + +// resticRepositories implements ResticRepositoryInterface +type resticRepositories struct { + client rest.Interface + ns string +} + +// newResticRepositories returns a ResticRepositories +func newResticRepositories(c *ArkV1Client, namespace string) *resticRepositories { + return &resticRepositories{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the resticRepository, and returns the corresponding resticRepository object, and an error if there is any. +func (c *resticRepositories) Get(name string, options meta_v1.GetOptions) (result *v1.ResticRepository, err error) { + result = &v1.ResticRepository{} + err = c.client.Get(). + Namespace(c.ns). + Resource("resticrepositories"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of ResticRepositories that match those selectors. +func (c *resticRepositories) List(opts meta_v1.ListOptions) (result *v1.ResticRepositoryList, err error) { + result = &v1.ResticRepositoryList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("resticrepositories"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested resticRepositories. +func (c *resticRepositories) Watch(opts meta_v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("resticrepositories"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a resticRepository and creates it. Returns the server's representation of the resticRepository, and an error, if there is any. +func (c *resticRepositories) Create(resticRepository *v1.ResticRepository) (result *v1.ResticRepository, err error) { + result = &v1.ResticRepository{} + err = c.client.Post(). + Namespace(c.ns). + Resource("resticrepositories"). + Body(resticRepository). + Do(). + Into(result) + return +} + +// Update takes the representation of a resticRepository and updates it. Returns the server's representation of the resticRepository, and an error, if there is any. +func (c *resticRepositories) Update(resticRepository *v1.ResticRepository) (result *v1.ResticRepository, err error) { + result = &v1.ResticRepository{} + err = c.client.Put(). + Namespace(c.ns). + Resource("resticrepositories"). + Name(resticRepository.Name). + Body(resticRepository). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *resticRepositories) UpdateStatus(resticRepository *v1.ResticRepository) (result *v1.ResticRepository, err error) { + result = &v1.ResticRepository{} + err = c.client.Put(). + Namespace(c.ns). + Resource("resticrepositories"). + Name(resticRepository.Name). + SubResource("status"). + Body(resticRepository). + Do(). + Into(result) + return +} + +// Delete takes name of the resticRepository and deletes it. Returns an error if one occurs. +func (c *resticRepositories) Delete(name string, options *meta_v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("resticrepositories"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *resticRepositories) DeleteCollection(options *meta_v1.DeleteOptions, listOptions meta_v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("resticrepositories"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched resticRepository. +func (c *resticRepositories) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.ResticRepository, err error) { + result = &v1.ResticRepository{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("resticrepositories"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/generated/informers/externalversions/ark/v1/interface.go b/pkg/generated/informers/externalversions/ark/v1/interface.go index c1d0e69d2..b197b2a66 100644 --- a/pkg/generated/informers/externalversions/ark/v1/interface.go +++ b/pkg/generated/informers/externalversions/ark/v1/interface.go @@ -36,6 +36,8 @@ type Interface interface { PodVolumeBackups() PodVolumeBackupInformer // PodVolumeRestores returns a PodVolumeRestoreInformer. PodVolumeRestores() PodVolumeRestoreInformer + // ResticRepositories returns a ResticRepositoryInformer. + ResticRepositories() ResticRepositoryInformer // Restores returns a RestoreInformer. Restores() RestoreInformer // Schedules returns a ScheduleInformer. @@ -83,6 +85,11 @@ func (v *version) PodVolumeRestores() PodVolumeRestoreInformer { return &podVolumeRestoreInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// ResticRepositories returns a ResticRepositoryInformer. +func (v *version) ResticRepositories() ResticRepositoryInformer { + return &resticRepositoryInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // Restores returns a RestoreInformer. func (v *version) Restores() RestoreInformer { return &restoreInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/informers/externalversions/ark/v1/resticrepository.go b/pkg/generated/informers/externalversions/ark/v1/resticrepository.go new file mode 100644 index 000000000..f30004e53 --- /dev/null +++ b/pkg/generated/informers/externalversions/ark/v1/resticrepository.go @@ -0,0 +1,89 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + time "time" + + ark_v1 "github.com/heptio/ark/pkg/apis/ark/v1" + versioned "github.com/heptio/ark/pkg/generated/clientset/versioned" + internalinterfaces "github.com/heptio/ark/pkg/generated/informers/externalversions/internalinterfaces" + v1 "github.com/heptio/ark/pkg/generated/listers/ark/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ResticRepositoryInformer provides access to a shared informer and lister for +// ResticRepositories. +type ResticRepositoryInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1.ResticRepositoryLister +} + +type resticRepositoryInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewResticRepositoryInformer constructs a new informer for ResticRepository type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewResticRepositoryInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredResticRepositoryInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredResticRepositoryInformer constructs a new informer for ResticRepository type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredResticRepositoryInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ArkV1().ResticRepositories(namespace).List(options) + }, + WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ArkV1().ResticRepositories(namespace).Watch(options) + }, + }, + &ark_v1.ResticRepository{}, + resyncPeriod, + indexers, + ) +} + +func (f *resticRepositoryInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredResticRepositoryInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *resticRepositoryInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&ark_v1.ResticRepository{}, f.defaultInformer) +} + +func (f *resticRepositoryInformer) Lister() v1.ResticRepositoryLister { + return v1.NewResticRepositoryLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/informers/externalversions/generic.go b/pkg/generated/informers/externalversions/generic.go index 4ad3b38d6..6e1c22334 100644 --- a/pkg/generated/informers/externalversions/generic.go +++ b/pkg/generated/informers/externalversions/generic.go @@ -65,6 +65,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().PodVolumeBackups().Informer()}, nil case v1.SchemeGroupVersion.WithResource("podvolumerestores"): return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().PodVolumeRestores().Informer()}, nil + case v1.SchemeGroupVersion.WithResource("resticrepositories"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().ResticRepositories().Informer()}, nil case v1.SchemeGroupVersion.WithResource("restores"): return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().Restores().Informer()}, nil case v1.SchemeGroupVersion.WithResource("schedules"): diff --git a/pkg/generated/listers/ark/v1/expansion_generated.go b/pkg/generated/listers/ark/v1/expansion_generated.go index 9b5a03f5d..9b4a1d5aa 100644 --- a/pkg/generated/listers/ark/v1/expansion_generated.go +++ b/pkg/generated/listers/ark/v1/expansion_generated.go @@ -66,6 +66,14 @@ type PodVolumeRestoreListerExpansion interface{} // PodVolumeRestoreNamespaceLister. type PodVolumeRestoreNamespaceListerExpansion interface{} +// ResticRepositoryListerExpansion allows custom methods to be added to +// ResticRepositoryLister. +type ResticRepositoryListerExpansion interface{} + +// ResticRepositoryNamespaceListerExpansion allows custom methods to be added to +// ResticRepositoryNamespaceLister. +type ResticRepositoryNamespaceListerExpansion interface{} + // RestoreListerExpansion allows custom methods to be added to // RestoreLister. type RestoreListerExpansion interface{} diff --git a/pkg/generated/listers/ark/v1/resticrepository.go b/pkg/generated/listers/ark/v1/resticrepository.go new file mode 100644 index 000000000..9fc95fa29 --- /dev/null +++ b/pkg/generated/listers/ark/v1/resticrepository.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/heptio/ark/pkg/apis/ark/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ResticRepositoryLister helps list ResticRepositories. +type ResticRepositoryLister interface { + // List lists all ResticRepositories in the indexer. + List(selector labels.Selector) (ret []*v1.ResticRepository, err error) + // ResticRepositories returns an object that can list and get ResticRepositories. + ResticRepositories(namespace string) ResticRepositoryNamespaceLister + ResticRepositoryListerExpansion +} + +// resticRepositoryLister implements the ResticRepositoryLister interface. +type resticRepositoryLister struct { + indexer cache.Indexer +} + +// NewResticRepositoryLister returns a new ResticRepositoryLister. +func NewResticRepositoryLister(indexer cache.Indexer) ResticRepositoryLister { + return &resticRepositoryLister{indexer: indexer} +} + +// List lists all ResticRepositories in the indexer. +func (s *resticRepositoryLister) List(selector labels.Selector) (ret []*v1.ResticRepository, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1.ResticRepository)) + }) + return ret, err +} + +// ResticRepositories returns an object that can list and get ResticRepositories. +func (s *resticRepositoryLister) ResticRepositories(namespace string) ResticRepositoryNamespaceLister { + return resticRepositoryNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// ResticRepositoryNamespaceLister helps list and get ResticRepositories. +type ResticRepositoryNamespaceLister interface { + // List lists all ResticRepositories in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1.ResticRepository, err error) + // Get retrieves the ResticRepository from the indexer for a given namespace and name. + Get(name string) (*v1.ResticRepository, error) + ResticRepositoryNamespaceListerExpansion +} + +// resticRepositoryNamespaceLister implements the ResticRepositoryNamespaceLister +// interface. +type resticRepositoryNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all ResticRepositories in the indexer for a given namespace. +func (s resticRepositoryNamespaceLister) List(selector labels.Selector) (ret []*v1.ResticRepository, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1.ResticRepository)) + }) + return ret, err +} + +// Get retrieves the ResticRepository from the indexer for a given namespace and name. +func (s resticRepositoryNamespaceLister) Get(name string) (*v1.ResticRepository, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("resticrepository"), name) + } + return obj.(*v1.ResticRepository), nil +} diff --git a/pkg/install/config.go b/pkg/install/config.go index c3f896d16..93690d61b 100644 --- a/pkg/install/config.go +++ b/pkg/install/config.go @@ -31,6 +31,7 @@ type arkConfig struct { gcSyncPeriod time.Duration podVolumeOperationTimeout time.Duration restoreOnly bool + resticLocation string } func WithBackupSyncPeriod(t time.Duration) arkConfigOption { @@ -57,6 +58,12 @@ func WithRestoreOnly() arkConfigOption { } } +func WithResticLocation(location string) arkConfigOption { + return func(c *arkConfig) { + c.resticLocation = location + } +} + func Config( namespace string, pvCloudProviderName string, @@ -87,7 +94,8 @@ func Config( Name: backupCloudProviderName, Config: backupCloudProviderConfig, }, - Bucket: bucket, + Bucket: bucket, + ResticLocation: c.resticLocation, }, BackupSyncPeriod: metav1.Duration{ Duration: c.backupSyncPeriod, diff --git a/pkg/install/crd.go b/pkg/install/crd.go index 1e960c704..2f7a9a6b0 100644 --- a/pkg/install/crd.go +++ b/pkg/install/crd.go @@ -36,6 +36,7 @@ func CRDs() []*apiextv1beta1.CustomResourceDefinition { crd("DeleteBackupRequest", "deletebackuprequests"), crd("PodVolumeBackup", "podvolumebackups"), crd("PodVolumeRestore", "podvolumerestores"), + crd("ResticRepository", "resticrepositories"), } } diff --git a/pkg/restic/backupper.go b/pkg/restic/backupper.go index 3edcc93f9..389a688fd 100644 --- a/pkg/restic/backupper.go +++ b/pkg/restic/backupper.go @@ -25,10 +25,12 @@ import ( "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" + arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" "github.com/heptio/ark/pkg/util/boolptr" ) @@ -39,18 +41,26 @@ type Backupper interface { } type backupper struct { - repoManager *repositoryManager ctx context.Context + repoManager *repositoryManager + repoLister arkv1listers.ResticRepositoryLister results map[string]chan *arkv1api.PodVolumeBackup resultsLock sync.Mutex } -func newBackupper(ctx context.Context, repoManager *repositoryManager, podVolumeBackupInformer cache.SharedIndexInformer) *backupper { +func newBackupper( + ctx context.Context, + repoManager *repositoryManager, + podVolumeBackupInformer cache.SharedIndexInformer, + repoLister arkv1listers.ResticRepositoryLister, +) *backupper { b := &backupper{ - repoManager: repoManager, ctx: ctx, - results: make(map[string]chan *arkv1api.PodVolumeBackup), + repoManager: repoManager, + repoLister: repoLister, + + results: make(map[string]chan *arkv1api.PodVolumeBackup), } podVolumeBackupInformer.AddEventHandler( @@ -81,9 +91,15 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod return nil, nil } - // ensure a repo exists for the pod's namespace - if err := b.repoManager.ensureRepo(pod.Namespace); err != nil { - return nil, []error{err} + repo, err := b.repoLister.ResticRepositories(backup.Namespace).Get(pod.Namespace) + if apierrors.IsNotFound(err) { + return nil, []error{errors.Wrapf(err, "restic repository not found")} + } + if err != nil { + return nil, []error{errors.Wrapf(err, "error getting restic repository")} + } + if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady { + return nil, []error{errors.New("restic repository not ready")} } resultsChan := make(chan *arkv1api.PodVolumeBackup) @@ -101,7 +117,7 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod b.repoManager.repoLocker.Lock(pod.Namespace) defer b.repoManager.repoLocker.Unlock(pod.Namespace) - volumeBackup := newPodVolumeBackup(backup, pod, volumeName, b.repoManager.config.repoPrefix) + volumeBackup := newPodVolumeBackup(backup, pod, volumeName, b.repoManager.repoPrefix) if err := errorOnly(b.repoManager.arkClient.ArkV1().PodVolumeBackups(volumeBackup.Namespace).Create(volumeBackup)); err != nil { errs = append(errs, err) diff --git a/pkg/restic/common.go b/pkg/restic/common.go index 02c654d31..b6dd3ebaa 100644 --- a/pkg/restic/common.go +++ b/pkg/restic/common.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "strings" + "time" "github.com/pkg/errors" @@ -32,7 +33,8 @@ import ( ) const ( - InitContainer = "restic-wait" + InitContainer = "restic-wait" + DefaultMaintenanceFrequency = 24 * time.Hour podAnnotationPrefix = "snapshot.ark.heptio.com/" volumesToBackupAnnotation = "backup.ark.heptio.com/backup-volumes" diff --git a/pkg/restic/config.go b/pkg/restic/config.go new file mode 100644 index 000000000..0b801ca86 --- /dev/null +++ b/pkg/restic/config.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restic + +import ( + "fmt" + "strings" + + arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" +) + +// getRepoPrefix returns the prefix of the value of the --repo flag for +// restic commands, i.e. everything except the "/". +func getRepoPrefix(config arkv1api.ObjectStorageProviderConfig) string { + if BackendType(config.Name) == AWSBackend { + var url string + switch { + // non-AWS, S3-compatible object store + case config.Config["s3Url"] != "": + url = config.Config["s3Url"] + default: + url = "s3.amazonaws.com" + } + + return fmt.Sprintf("s3:%s/%s", url, config.ResticLocation) + } + + var ( + parts = strings.SplitN(config.ResticLocation, "/", 2) + bucket, path string + ) + + if len(parts) >= 1 { + bucket = parts[0] + } + if len(parts) >= 2 { + path = parts[1] + } + + var prefix string + switch BackendType(config.Name) { + case AzureBackend: + prefix = "azure" + case GCPBackend: + prefix = "gs" + } + + return fmt.Sprintf("%s:%s:/%s", prefix, bucket, path) +} + +// GetRepoIdentifier returns the string to be used as the value of the --repo flag in +// restic commands for the given repository. +func GetRepoIdentifier(config arkv1api.ObjectStorageProviderConfig, name string) string { + prefix := getRepoPrefix(config) + + return fmt.Sprintf("%s/%s", strings.TrimSuffix(prefix, "/"), name) +} diff --git a/pkg/restic/repository_manager.go b/pkg/restic/repository_manager.go index 064e13483..f26e4a125 100644 --- a/pkg/restic/repository_manager.go +++ b/pkg/restic/repository_manager.go @@ -21,39 +21,32 @@ import ( "fmt" "os" "os/exec" - "strings" "github.com/pkg/errors" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kerrs "k8s.io/apimachinery/pkg/util/errors" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" arkv1api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" arkv1informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" - "github.com/heptio/ark/pkg/util/sync" + arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" ) // RepositoryManager executes commands against restic repositories. type RepositoryManager interface { + // InitRepo initializes a repo with the specified name. + InitRepo(name string) error + // CheckRepo checks the specified repo for errors. CheckRepo(name string) error - // CheckAllRepos checks all repos for errors. - CheckAllRepos() error - // PruneRepo deletes unused data from a repo. PruneRepo(name string) error - // PruneAllRepos deletes unused data from all - // repos. - PruneAllRepos() error - // Forget removes a snapshot from the list of // available snapshots in a repo. Forget(snapshot SnapshotIdentifier) error @@ -86,78 +79,35 @@ const ( ) type repositoryManager struct { - objectStore cloudprovider.ObjectStore - config config - arkClient clientset.Interface - secretsLister corev1listers.SecretLister - secretsClient corev1client.SecretsGetter - log logrus.FieldLogger - repoLocker *repoLocker -} - -type config struct { - repoPrefix string - bucket string - path string -} - -func getConfig(objectStorageConfig arkv1api.ObjectStorageProviderConfig) config { - var ( - c = config{} - parts = strings.SplitN(objectStorageConfig.ResticLocation, "/", 2) - ) - - switch len(parts) { - case 0: - case 1: - c.bucket = parts[0] - default: - c.bucket = parts[0] - c.path = parts[1] - } - - switch BackendType(objectStorageConfig.Name) { - case AWSBackend: - var url string - switch { - // non-AWS, S3-compatible object store - case objectStorageConfig.Config != nil && objectStorageConfig.Config["s3Url"] != "": - url = objectStorageConfig.Config["s3Url"] - default: - url = "s3.amazonaws.com" - } - - c.repoPrefix = fmt.Sprintf("s3:%s/%s", url, c.bucket) - if c.path != "" { - c.repoPrefix += "/" + c.path - } - case AzureBackend: - c.repoPrefix = fmt.Sprintf("azure:%s:/%s", c.bucket, c.path) - case GCPBackend: - c.repoPrefix = fmt.Sprintf("gs:%s:/%s", c.bucket, c.path) - } - - return c + repoPrefix string + arkClient clientset.Interface + secretsLister corev1listers.SecretLister + secretsClient corev1client.SecretsGetter + repoLister arkv1listers.ResticRepositoryLister + repoInformerSynced cache.InformerSynced + log logrus.FieldLogger + repoLocker *repoLocker } // NewRepositoryManager constructs a RepositoryManager. func NewRepositoryManager( ctx context.Context, - objectStore cloudprovider.ObjectStore, config arkv1api.ObjectStorageProviderConfig, arkClient clientset.Interface, secretsInformer cache.SharedIndexInformer, secretsClient corev1client.SecretsGetter, + repoInformer arkv1informers.ResticRepositoryInformer, log logrus.FieldLogger, ) (RepositoryManager, error) { rm := &repositoryManager{ - objectStore: objectStore, - config: getConfig(config), - arkClient: arkClient, - secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()), - secretsClient: secretsClient, - log: log, - repoLocker: newRepoLocker(), + repoPrefix: getRepoPrefix(config), + arkClient: arkClient, + secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()), + secretsClient: secretsClient, + repoLister: repoInformer.Lister(), + repoInformerSynced: repoInformer.Informer().HasSynced, + log: log, + repoLocker: newRepoLocker(), } if !cache.WaitForCacheSync(ctx.Done(), secretsInformer.HasSynced) { @@ -178,11 +128,11 @@ func (rm *repositoryManager) NewBackupper(ctx context.Context, backup *arkv1api. }, ) - b := newBackupper(ctx, rm, informer) + b := newBackupper(ctx, rm, informer, rm.repoLister) go informer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { - return nil, errors.New("timed out waiting for cache to sync") + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, rm.repoInformerSynced) { + return nil, errors.New("timed out waiting for caches to sync") } return b, nil @@ -209,98 +159,18 @@ func (rm *repositoryManager) NewRestorer(ctx context.Context, restore *arkv1api. return r, nil } -func (rm *repositoryManager) ensureRepo(name string) error { - repos, err := rm.getAllRepos() - if err != nil { - return err - } - - for _, repo := range repos { - if repo == name { - return nil - } - } - +func (rm *repositoryManager) InitRepo(name string) error { rm.repoLocker.LockExclusive(name) defer rm.repoLocker.UnlockExclusive(name) - // init the repo - cmd := InitCommand(rm.config.repoPrefix, name) - - return errorOnly(rm.exec(cmd)) -} - -func (rm *repositoryManager) getAllRepos() ([]string, error) { - // TODO support rm.config.path - prefixes, err := rm.objectStore.ListCommonPrefixes(rm.config.bucket, "/") - if err != nil { - return nil, err - } - - var repos []string - for _, prefix := range prefixes { - if len(prefix) <= 1 { - continue - } - - // strip the trailing '/' if it exists - repos = append(repos, strings.TrimSuffix(prefix, "/")) - } - - return repos, nil -} - -func (rm *repositoryManager) CheckAllRepos() error { - repos, err := rm.getAllRepos() - if err != nil { - return err - } - - var eg sync.ErrorGroup - for _, repo := range repos { - this := repo - eg.Go(func() error { - rm.log.WithField("repo", this).Debugf("Checking repo %s", this) - return rm.CheckRepo(this) - }) - } - - return kerrs.NewAggregate(eg.Wait()) -} - -func (rm *repositoryManager) PruneAllRepos() error { - repos, err := rm.getAllRepos() - if err != nil { - return err - } - - var eg sync.ErrorGroup - for _, repo := range repos { - this := repo - eg.Go(func() error { - rm.log.WithField("repo", this).Debugf("Pre-prune checking repo %s", this) - if err := rm.CheckRepo(this); err != nil { - return err - } - - rm.log.WithField("repo", this).Debugf("Pruning repo %s", this) - if err := rm.PruneRepo(this); err != nil { - return err - } - - rm.log.WithField("repo", this).Debugf("Post-prune checking repo %s", this) - return rm.CheckRepo(this) - }) - } - - return kerrs.NewAggregate(eg.Wait()) + return errorOnly(rm.exec(InitCommand(rm.repoPrefix, name))) } func (rm *repositoryManager) CheckRepo(name string) error { rm.repoLocker.LockExclusive(name) defer rm.repoLocker.UnlockExclusive(name) - cmd := CheckCommand(rm.config.repoPrefix, name) + cmd := CheckCommand(rm.repoPrefix, name) return errorOnly(rm.exec(cmd)) } @@ -309,7 +179,7 @@ func (rm *repositoryManager) PruneRepo(name string) error { rm.repoLocker.LockExclusive(name) defer rm.repoLocker.UnlockExclusive(name) - cmd := PruneCommand(rm.config.repoPrefix, name) + cmd := PruneCommand(rm.repoPrefix, name) return errorOnly(rm.exec(cmd)) } @@ -318,7 +188,7 @@ func (rm *repositoryManager) Forget(snapshot SnapshotIdentifier) error { rm.repoLocker.LockExclusive(snapshot.Repo) defer rm.repoLocker.UnlockExclusive(snapshot.Repo) - cmd := ForgetCommand(rm.config.repoPrefix, snapshot.Repo, snapshot.SnapshotID) + cmd := ForgetCommand(rm.repoPrefix, snapshot.Repo, snapshot.SnapshotID) return errorOnly(rm.exec(cmd)) } diff --git a/pkg/restic/restorer.go b/pkg/restic/restorer.go index 2cc9eebb6..893b54291 100644 --- a/pkg/restic/restorer.go +++ b/pkg/restic/restorer.go @@ -91,7 +91,7 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P r.repoManager.repoLocker.Lock(pod.Namespace) defer r.repoManager.repoLocker.Unlock(pod.Namespace) - volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, r.repoManager.config.repoPrefix) + volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, r.repoManager.repoPrefix) if err := errorOnly(r.repoManager.arkClient.ArkV1().PodVolumeRestores(volumeRestore.Namespace).Create(volumeRestore)); err != nil { errs = append(errs, errors.WithStack(err))