From 74f60b1ee1db4a6f1a6785b33b9b58a3f582dd60 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 14 Mar 2018 14:17:27 -0400 Subject: [PATCH] Switch backup finalizer to DeleteBackupRequest We ran into a lot of problems using a finalizer on the backup to allow the Ark server to clean up all associated backup data when deleting a backup. Users also found it less than desirable that deleting the heptio-ark namespace resulted in all the backup data being deleted. This removes the finalizer and replaces it with an explicit DeleteBackupRequest that is created as a means of requesting the deletion of a backup and all its associated data. This is what `ark backup delete` does. If you use kubectl to delete a backup or to delete the heptio-ark namespace, this no longer deletes associated backups. Additionally, as long as the heptio-ark namespace still exists, the Ark server's BackupSyncController will continually sync backups into the heptio-ark namespace from object storage. Signed-off-by: Andy Goldstein --- pkg/apis/ark/v1/backup.go | 5 +- pkg/apis/ark/v1/constants.go | 4 - pkg/apis/ark/v1/delete_backup_request.go | 67 +++ pkg/apis/ark/v1/register.go | 2 + pkg/apis/ark/v1/zz_generated.deepcopy.go | 100 ++++ pkg/backup/delete_helpers.go | 47 ++ pkg/cmd/cli/backup/delete.go | 91 +--- pkg/cmd/cli/backup/describe.go | 14 +- pkg/cmd/server/server.go | 102 +++- pkg/cmd/util/output/backup_describer.go | 68 ++- pkg/controller/backup_controller.go | 6 - pkg/controller/backup_controller_test.go | 16 +- pkg/controller/backup_deletion_controller.go | 295 ++++++++++++ .../backup_deletion_controller_test.go | 391 +++++++++++++++ pkg/controller/gc_controller.go | 232 +++------ pkg/controller/gc_controller_test.go | 455 ++++++------------ pkg/controller/generic_controller.go | 153 ++++++ .../versioned/typed/ark/v1/ark_client.go | 5 + .../typed/ark/v1/deletebackuprequest.go | 171 +++++++ .../typed/ark/v1/fake/fake_ark_client.go | 4 + .../ark/v1/fake/fake_deletebackuprequest.go | 137 ++++++ .../typed/ark/v1/generated_expansion.go | 2 + .../ark/v1/deletebackuprequest.go | 88 ++++ .../externalversions/ark/v1/interface.go | 7 + .../informers/externalversions/generic.go | 2 + .../listers/ark/v1/deletebackuprequest.go | 94 ++++ .../listers/ark/v1/expansion_generated.go | 8 + pkg/util/test/test_backup.go | 5 - 28 files changed, 1960 insertions(+), 611 deletions(-) create mode 100644 pkg/apis/ark/v1/delete_backup_request.go create mode 100644 pkg/backup/delete_helpers.go create mode 100644 pkg/controller/backup_deletion_controller.go create mode 100644 pkg/controller/backup_deletion_controller_test.go create mode 100644 pkg/controller/generic_controller.go create mode 100644 pkg/generated/clientset/versioned/typed/ark/v1/deletebackuprequest.go create mode 100644 pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_deletebackuprequest.go create mode 100644 pkg/generated/informers/externalversions/ark/v1/deletebackuprequest.go create mode 100644 pkg/generated/listers/ark/v1/deletebackuprequest.go diff --git a/pkg/apis/ark/v1/backup.go b/pkg/apis/ark/v1/backup.go index d16364d95..b871d6d77 100644 --- a/pkg/apis/ark/v1/backup.go +++ b/pkg/apis/ark/v1/backup.go @@ -143,9 +143,12 @@ const ( // errors. BackupPhaseCompleted BackupPhase = "Completed" - // BackupPhaseFailed mean the backup ran but encountered an error that + // BackupPhaseFailed means the backup ran but encountered an error that // prevented it from completing successfully. BackupPhaseFailed BackupPhase = "Failed" + + // BackupPhaseDeleting means the backup and all its associated data are being deleted. + BackupPhaseDeleting BackupPhase = "Deleting" ) // BackupStatus captures the current status of an Ark backup. diff --git a/pkg/apis/ark/v1/constants.go b/pkg/apis/ark/v1/constants.go index e4157a338..0f87c91c1 100644 --- a/pkg/apis/ark/v1/constants.go +++ b/pkg/apis/ark/v1/constants.go @@ -37,8 +37,4 @@ const ( // NamespaceScopedDir is the name of the directory containing namespace-scoped // resource within an Ark backup. NamespaceScopedDir = "namespaces" - - // GCFinalizer is the name of the finalizer Ark uses for backups to allow for the GC Controller to - // delete all resources associated with a backup. - GCFinalizer = "gc.ark.heptio.com" ) diff --git a/pkg/apis/ark/v1/delete_backup_request.go b/pkg/apis/ark/v1/delete_backup_request.go new file mode 100644 index 000000000..cfd1571d9 --- /dev/null +++ b/pkg/apis/ark/v1/delete_backup_request.go @@ -0,0 +1,67 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// DeleteBackupRequestSpec is the specification for which backups to delete. +type DeleteBackupRequestSpec struct { + BackupName string `json:"backupName"` +} + +// DeleteBackupRequestPhase represents the lifecycle phase of a DeleteBackupRequest. +type DeleteBackupRequestPhase string + +const ( + // DeleteBackupRequestPhaseNew means the DeleteBackupRequest has not been processed yet. + DeleteBackupRequestPhaseNew DeleteBackupRequestPhase = "New" + // DeleteBackupRequestPhaseInProgress means the DeleteBackupRequest is being processed. + DeleteBackupRequestPhaseInProgress DeleteBackupRequestPhase = "InProgress" + // DeleteBackupRequestPhaseProcessed means the DeleteBackupRequest has been processed. + DeleteBackupRequestPhaseProcessed DeleteBackupRequestPhase = "Processed" + + BackupNameLabel = "ark.heptio.com/backup-name" +) + +// DeleteBackupRequestStatus is the current status of a DeleteBackupRequest. +type DeleteBackupRequestStatus struct { + // Phase is the current state of the DeleteBackupRequest. + Phase DeleteBackupRequestPhase `json:"phase"` + // Errors contains any errors that were encountered during the deletion process. + Errors []string `json:"errors"` +} + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// DeleteBackupRequest is a request to delete one or more backups. +type DeleteBackupRequest struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + + Spec DeleteBackupRequestSpec `json:"spec"` + Status DeleteBackupRequestStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// DeleteBackupRequestList is a list of DeleteBackupRequests. +type DeleteBackupRequestList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []DeleteBackupRequest `json:"items"` +} diff --git a/pkg/apis/ark/v1/register.go b/pkg/apis/ark/v1/register.go index 99a3b6352..66a76baa6 100644 --- a/pkg/apis/ark/v1/register.go +++ b/pkg/apis/ark/v1/register.go @@ -53,6 +53,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ConfigList{}, &DownloadRequest{}, &DownloadRequestList{}, + &DeleteBackupRequest{}, + &DeleteBackupRequestList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/ark/v1/zz_generated.deepcopy.go b/pkg/apis/ark/v1/zz_generated.deepcopy.go index 181a46175..3ae75ce76 100644 --- a/pkg/apis/ark/v1/zz_generated.deepcopy.go +++ b/pkg/apis/ark/v1/zz_generated.deepcopy.go @@ -403,6 +403,106 @@ func (in *ConfigList) DeepCopyObject() runtime.Object { } } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeleteBackupRequest) DeepCopyInto(out *DeleteBackupRequest) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeleteBackupRequest. +func (in *DeleteBackupRequest) DeepCopy() *DeleteBackupRequest { + if in == nil { + return nil + } + out := new(DeleteBackupRequest) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DeleteBackupRequest) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } else { + return nil + } +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeleteBackupRequestList) DeepCopyInto(out *DeleteBackupRequestList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]DeleteBackupRequest, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeleteBackupRequestList. +func (in *DeleteBackupRequestList) DeepCopy() *DeleteBackupRequestList { + if in == nil { + return nil + } + out := new(DeleteBackupRequestList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DeleteBackupRequestList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } else { + return nil + } +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeleteBackupRequestSpec) DeepCopyInto(out *DeleteBackupRequestSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeleteBackupRequestSpec. +func (in *DeleteBackupRequestSpec) DeepCopy() *DeleteBackupRequestSpec { + if in == nil { + return nil + } + out := new(DeleteBackupRequestSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeleteBackupRequestStatus) DeepCopyInto(out *DeleteBackupRequestStatus) { + *out = *in + if in.Errors != nil { + in, out := &in.Errors, &out.Errors + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeleteBackupRequestStatus. +func (in *DeleteBackupRequestStatus) DeepCopy() *DeleteBackupRequestStatus { + if in == nil { + return nil + } + out := new(DeleteBackupRequestStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DownloadRequest) DeepCopyInto(out *DownloadRequest) { *out = *in diff --git a/pkg/backup/delete_helpers.go b/pkg/backup/delete_helpers.go new file mode 100644 index 000000000..cf2535189 --- /dev/null +++ b/pkg/backup/delete_helpers.go @@ -0,0 +1,47 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package backup + +import ( + "fmt" + + "github.com/heptio/ark/pkg/apis/ark/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// NewDeleteBackupRequest creates a DeleteBackupRequest for the backup identified by name. +func NewDeleteBackupRequest(name string) *v1.DeleteBackupRequest { + return &v1.DeleteBackupRequest{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: name + "-", + Labels: map[string]string{ + v1.BackupNameLabel: name, + }, + }, + Spec: v1.DeleteBackupRequestSpec{ + BackupName: name, + }, + } +} + +// NewDeleteBackupRequestListOptions creates a ListOptions with a label selector configured to +// find DeleteBackupRequests for the backup identified by name. +func NewDeleteBackupRequestListOptions(name string) metav1.ListOptions { + return metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", v1.BackupNameLabel, name), + } +} diff --git a/pkg/cmd/cli/backup/delete.go b/pkg/cmd/cli/backup/delete.go index 866d2ee58..31aa8702d 100644 --- a/pkg/cmd/cli/backup/delete.go +++ b/pkg/cmd/cli/backup/delete.go @@ -18,22 +18,16 @@ package backup import ( "bufio" - "encoding/json" + "errors" "fmt" "os" "strings" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - - "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/controller" + "github.com/heptio/ark/pkg/backup" clientset "github.com/heptio/ark/pkg/generated/clientset/versioned" - kubeutil "github.com/heptio/ark/pkg/util/kube" - "github.com/heptio/ark/pkg/util/stringslice" - "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/heptio/ark/pkg/client" "github.com/heptio/ark/pkg/cmd" @@ -60,7 +54,6 @@ func NewDeleteCommand(f client.Factory, use string) *cobra.Command { type DeleteOptions struct { Name string - Force bool Confirm bool client clientset.Interface @@ -68,52 +61,45 @@ type DeleteOptions struct { } func (o *DeleteOptions) BindFlags(flags *pflag.FlagSet) { - flags.BoolVar(&o.Force, "force", o.Force, "Forcefully delete the backup, potentially leaving orphaned cloud resources") - flags.BoolVar(&o.Confirm, "confirm", o.Confirm, "Confirm forceful deletion") + flags.BoolVar(&o.Confirm, "confirm", o.Confirm, "Confirm deletion") } func (o *DeleteOptions) Validate(c *cobra.Command, args []string, f client.Factory) error { - kubeClient, err := f.KubeClient() - if err != nil { - return err + if o.client == nil { + return errors.New("Ark client is not set; unable to proceed") } - serverVersion, err := kubeutil.ServerVersion(kubeClient.Discovery()) + _, err := o.client.ArkV1().Backups(f.Namespace()).Get(args[0], metav1.GetOptions{}) if err != nil { return err } - if !serverVersion.AtLeast(controller.MinVersionForDelete) { - return errors.Errorf("this command requires the Kubernetes server version to be at least %s", controller.MinVersionForDelete) - } - return nil } func (o *DeleteOptions) Complete(f client.Factory, args []string) error { o.Name = args[0] - var err error - o.client, err = f.Client() + o.namespace = f.Namespace() + + client, err := f.Client() if err != nil { return err } - - o.namespace = f.Namespace() + o.client = client return nil } func (o *DeleteOptions) Run() error { - if o.Force { - return o.forceDelete() + if !o.Confirm && !getConfirmation() { + // Don't do anything unless we get confirmation + return nil } - return o.normalDelete() -} + deleteRequest := backup.NewDeleteBackupRequest(o.Name) -func (o *DeleteOptions) normalDelete() error { - if err := o.client.ArkV1().Backups(o.namespace).Delete(o.Name, nil); err != nil { + if _, err := o.client.ArkV1().DeleteBackupRequests(o.namespace).Create(deleteRequest); err != nil { return err } @@ -121,55 +107,10 @@ func (o *DeleteOptions) normalDelete() error { return nil } -func (o *DeleteOptions) forceDelete() error { - backup, err := o.client.ArkV1().Backups(o.namespace).Get(o.Name, metav1.GetOptions{}) - if err != nil { - return errors.WithStack(err) - } - - if !o.Confirm { - // If the user didn't specify --confirm, we need to prompt for it - if !getConfirmation() { - return nil - } - } - - // Step 1: patch to remove our finalizer, if it's there - if stringslice.Has(backup.Finalizers, v1.GCFinalizer) { - patchMap := map[string]interface{}{ - "metadata": map[string]interface{}{ - "finalizers": stringslice.Except(backup.Finalizers, v1.GCFinalizer), - "resourceVersion": backup.ResourceVersion, - }, - } - - patchBytes, err := json.Marshal(patchMap) - if err != nil { - return errors.WithStack(err) - } - - if _, err = o.client.ArkV1().Backups(backup.Namespace).Patch(backup.Name, types.MergePatchType, patchBytes); err != nil { - return errors.WithStack(err) - } - } - - // Step 2: issue the delete ONLY if it has never been issued before - if backup.DeletionTimestamp == nil { - if err = o.client.ArkV1().Backups(backup.Namespace).Delete(backup.Name, nil); err != nil { - return errors.WithStack(err) - } - } - - fmt.Printf("Backup %q force-deleted.\n", backup.Name) - - return nil -} - func getConfirmation() bool { reader := bufio.NewReader(os.Stdin) for { - fmt.Println("WARNING: forcing deletion of a backup may result in resources in the cloud (disk snapshots, backup files) becoming orphaned.") fmt.Printf("Are you sure you want to continue (Y/N)? ") confirmation, err := reader.ReadString('\n') diff --git a/pkg/cmd/cli/backup/describe.go b/pkg/cmd/cli/backup/describe.go index cb9f3f4d5..96c73f9bd 100644 --- a/pkg/cmd/cli/backup/describe.go +++ b/pkg/cmd/cli/backup/describe.go @@ -18,7 +18,9 @@ package backup import ( "fmt" + "os" + pkgbackup "github.com/heptio/ark/pkg/backup" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -53,7 +55,17 @@ func NewDescribeCommand(f client.Factory, use string) *cobra.Command { first := true for _, backup := range backups.Items { - s := output.DescribeBackup(&backup) + var deleteRequests []v1.DeleteBackupRequest + if backup.Status.Phase == v1.BackupPhaseDeleting { + deleteRequestListOptions := pkgbackup.NewDeleteBackupRequestListOptions(backup.Name) + deleteRequestList, err := arkClient.ArkV1().DeleteBackupRequests(f.Namespace()).List(deleteRequestListOptions) + if err != nil { + fmt.Fprintf(os.Stderr, "error getting DeleteBackupRequests for backup %s: %v\n", backup.Name, err) + } + deleteRequests = deleteRequestList.Items + } + + s := output.DescribeBackup(&backup, deleteRequests) if first { first = false fmt.Print(s) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 9c5bd125d..9d762aee0 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -18,6 +18,7 @@ package server import ( "context" + "encoding/json" "fmt" "io/ioutil" "reflect" @@ -35,6 +36,8 @@ import ( "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" @@ -57,8 +60,8 @@ import ( "github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/restore" "github.com/heptio/ark/pkg/util/kube" - kubeutil "github.com/heptio/ark/pkg/util/kube" "github.com/heptio/ark/pkg/util/logging" + "github.com/heptio/ark/pkg/util/stringslice" ) func NewCommand() *cobra.Command { @@ -471,7 +474,7 @@ func (s *server) runControllers(config *api.Config) error { ) if config.RestoreOnlyMode { - s.logger.Info("Restore only mode - not starting the backup, schedule or GC controllers") + s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers") } else { backupper, err := newBackupper(discoveryHelper, s.clientPool, s.backupService, s.snapshotService, s.kubeClientConfig, s.kubeClient.CoreV1()) cmd.CheckError(err) @@ -505,31 +508,35 @@ func (s *server) runControllers(config *api.Config) error { wg.Done() }() - serverVersion, err := kubeutil.ServerVersion(s.kubeClient.Discovery()) - if err != nil { - return err - } + gcController := controller.NewGCController( + s.logger, + s.sharedInformerFactory.Ark().V1().Backups(), + s.arkClient.ArkV1(), + config.GCSyncPeriod.Duration, + ) + wg.Add(1) + go func() { + gcController.Run(ctx, 1) + wg.Done() + }() + + backupDeletionController := controller.NewBackupDeletionController( + s.logger, + s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(), + s.arkClient.ArkV1(), // deleteBackupRequestClient + s.arkClient.ArkV1(), // backupClient + s.snapshotService, + s.backupService, + config.BackupStorageProvider.Bucket, + s.sharedInformerFactory.Ark().V1().Restores(), + s.arkClient.ArkV1(), // restoreClient + ) + wg.Add(1) + go func() { + backupDeletionController.Run(ctx, 1) + wg.Done() + }() - if !serverVersion.AtLeast(controller.MinVersionForDelete) { - s.logger.Errorf("Garbage-collection is disabled because it requires the Kubernetes server version to be at least %s", controller.MinVersionForDelete) - } else { - gcController := controller.NewGCController( - s.backupService, - s.snapshotService, - config.BackupStorageProvider.Bucket, - config.GCSyncPeriod.Duration, - s.sharedInformerFactory.Ark().V1().Backups(), - s.arkClient.ArkV1(), - s.sharedInformerFactory.Ark().V1().Restores(), - s.arkClient.ArkV1(), - s.logger, - ) - wg.Add(1) - go func() { - gcController.Run(ctx, 1) - wg.Done() - }() - } } restorer, err := newRestorer( @@ -579,6 +586,10 @@ func (s *server) runControllers(config *api.Config) error { // SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS go s.sharedInformerFactory.Start(ctx.Done()) + // Remove this sometime after v0.8.0 + cache.WaitForCacheSync(ctx.Done(), s.sharedInformerFactory.Ark().V1().Backups().Informer().HasSynced) + s.removeDeprecatedGCFinalizer() + s.logger.Info("Server started successfully") <-ctx.Done() @@ -589,6 +600,45 @@ func (s *server) runControllers(config *api.Config) error { return nil } +const gcFinalizer = "gc.ark.heptio.com" + +func (s *server) removeDeprecatedGCFinalizer() { + backups, err := s.sharedInformerFactory.Ark().V1().Backups().Lister().List(labels.Everything()) + if err != nil { + s.logger.WithError(errors.WithStack(err)).Error("error listing backups from cache - unable to remove old finalizers") + return + } + + for _, backup := range backups { + log := s.logger.WithField("backup", kube.NamespaceAndName(backup)) + + if !stringslice.Has(backup.Finalizers, gcFinalizer) { + log.Debug("backup doesn't have deprecated finalizer - skipping") + continue + } + + log.Info("removing deprecated finalizer from backup") + + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": stringslice.Except(backup.Finalizers, gcFinalizer), + "resourceVersion": backup.ResourceVersion, + }, + } + + patchBytes, err := json.Marshal(patch) + if err != nil { + log.WithError(errors.WithStack(err)).Error("error marshaling finalizers patch") + continue + } + + _, err = s.arkClient.ArkV1().Backups(backup.Namespace).Patch(backup.Name, types.MergePatchType, patchBytes) + if err != nil { + log.WithError(errors.WithStack(err)).Error("error marshaling finalizers patch") + } + } +} + func newBackupper( discoveryHelper arkdiscovery.Helper, clientPool dynamic.ClientPool, diff --git a/pkg/cmd/util/output/backup_describer.go b/pkg/cmd/util/output/backup_describer.go index 8a6985cd9..20895f574 100644 --- a/pkg/cmd/util/output/backup_describer.go +++ b/pkg/cmd/util/output/backup_describer.go @@ -24,19 +24,36 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func DescribeBackup(backup *v1.Backup) string { +// DescribeBackup describes a backup in human-readable format. +func DescribeBackup(backup *v1.Backup, deleteRequests []v1.DeleteBackupRequest) string { return Describe(func(d *Describer) { d.DescribeMetadata(backup.ObjectMeta) + d.Println() + phase := backup.Status.Phase + if phase == "" { + phase = v1.BackupPhaseNew + } + d.Printf("Phase:\t%s", phase) + if count := failedDeletionCount(deleteRequests); count > 0 { + d.Printf(" (%d failed attempt(s))", count) + } + d.Println() + d.Println() DescribeBackupSpec(d, backup.Spec) d.Println() - deleting := backup.DeletionTimestamp != nil && !backup.DeletionTimestamp.Time.IsZero() - DescribeBackupStatus(d, backup.Status, deleting) + DescribeBackupStatus(d, backup.Status) + + if len(deleteRequests) > 0 { + d.Println() + DescribeDeleteBackupRequests(d, deleteRequests) + } }) } +// DescribeBackupSpec describes a backup spec in human-readable format. func DescribeBackupSpec(d *Describer, spec v1.BackupSpec) { // TODO make a helper for this and use it in all the describers. d.Printf("Namespaces:\n") @@ -144,17 +161,8 @@ func DescribeBackupSpec(d *Describer, spec v1.BackupSpec) { } -func DescribeBackupStatus(d *Describer, status v1.BackupStatus, deleting bool) { - phase := status.Phase - if phase == "" { - phase = v1.BackupPhaseNew - } - if deleting { - phase = "Deleting" - } - d.Printf("Phase:\t%s\n", phase) - - d.Println() +// DescribeBackupStatus describes a backup status in human-readable format. +func DescribeBackupStatus(d *Describer, status v1.BackupStatus) { d.Printf("Backup Format Version:\t%d\n", status.Version) d.Println() @@ -188,3 +196,35 @@ func DescribeBackupStatus(d *Describer, status v1.BackupStatus, deleting bool) { } } } + +// DescribeDeleteBackupRequests describes delete backup requests in human-readable format. +func DescribeDeleteBackupRequests(d *Describer, requests []v1.DeleteBackupRequest) { + d.Printf("Deletion Attempts:\n") + + started := false + for _, req := range requests { + if !started { + started = true + } else { + d.Println() + } + + d.Printf("\t%s: %s\n", req.CreationTimestamp.String(), req.Status.Phase) + if len(req.Status.Errors) > 0 { + d.Printf("\tErrors:\n") + for _, err := range req.Status.Errors { + d.Printf("\t\t%s\n", err) + } + } + } +} + +func failedDeletionCount(requests []v1.DeleteBackupRequest) int { + var count int + for _, req := range requests { + if req.Status.Phase == v1.DeleteBackupRequestPhaseProcessed && len(req.Status.Errors) > 0 { + count++ + } + } + return count +} diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 4efc5b34d..df8685ce0 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -49,7 +49,6 @@ import ( "github.com/heptio/ark/pkg/util/collections" "github.com/heptio/ark/pkg/util/encode" kubeutil "github.com/heptio/ark/pkg/util/kube" - "github.com/heptio/ark/pkg/util/stringslice" ) const backupVersion = 1 @@ -237,11 +236,6 @@ func (controller *backupController) processBackup(key string) error { // set backup version backup.Status.Version = backupVersion - // add GC finalizer if it's not there already - if !stringslice.Has(backup.Finalizers, api.GCFinalizer) { - backup.Finalizers = append(backup.Finalizers, api.GCFinalizer) - } - // calculate expiration if backup.Spec.TTL.Duration > 0 { backup.Status.Expiration = metav1.NewTime(controller.clock.Now().Add(backup.Spec.TTL.Duration)) diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 4c2b40cb4..a2a1a098d 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -190,7 +190,6 @@ func TestProcessBackup(t *testing.T) { backup.Status.Phase = v1.BackupPhaseInProgress backup.Status.Expiration.Time = expiration backup.Status.Version = 1 - backup.Finalizers = []string{v1.GCFinalizer} backupper.On("Backup", backup, mock.Anything, mock.Anything, mock.Anything).Return(nil) cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -226,7 +225,6 @@ func TestProcessBackup(t *testing.T) { res.Status.Version = 1 res.Status.Expiration.Time = expiration res.Status.Phase = v1.BackupPhase(phase) - res.Finalizers = []string{v1.GCFinalizer} return true, res, nil }) @@ -249,15 +247,15 @@ func TestProcessBackup(t *testing.T) { actions := client.Actions() require.Equal(t, 2, len(actions)) - // validate Patch call 1 (setting finalizer, version, expiration, and phase) + // validate Patch call 1 (setting version, expiration, and phase) patchAction, ok := actions[0].(core.PatchAction) require.True(t, ok, "action is not a PatchAction") patch := make(map[string]interface{}) require.NoError(t, json.Unmarshal(patchAction.GetPatch(), &patch), "cannot unmarshal patch") - // should have metadata and status - assert.Equal(t, 2, len(patch), "patch has wrong number of keys") + // should have status + assert.Equal(t, 1, len(patch), "patch has wrong number of keys") expectedStatusKeys := 2 if test.backup.Spec.TTL.Duration > 0 { @@ -271,14 +269,6 @@ func TestProcessBackup(t *testing.T) { res, _ := collections.GetMap(patch, "status") assert.Equal(t, expectedStatusKeys, len(res), "patch's status has the wrong number of keys") - finalizers, err := collections.GetSlice(patch, "metadata.finalizers") - require.NoError(t, err, "patch does not contain metadata.finalizers") - assert.Equal(t, 1, len(finalizers)) - assert.Equal(t, v1.GCFinalizer, finalizers[0]) - - res, _ = collections.GetMap(patch, "metadata") - assert.Equal(t, 1, len(res), "patch's metadata has the wrong number of keys") - // validate Patch call 2 (setting phase) patchAction, ok = actions[1].(core.PatchAction) require.True(t, ok, "action is not a PatchAction") diff --git a/pkg/controller/backup_deletion_controller.go b/pkg/controller/backup_deletion_controller.go new file mode 100644 index 000000000..1fbc8d945 --- /dev/null +++ b/pkg/controller/backup_deletion_controller.go @@ -0,0 +1,295 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "encoding/json" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/heptio/ark/pkg/apis/ark/v1" + pkgbackup "github.com/heptio/ark/pkg/backup" + "github.com/heptio/ark/pkg/cloudprovider" + arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" + listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/util/kube" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" +) + +type backupDeletionController struct { + *genericController + + deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter + deleteBackupRequestLister listers.DeleteBackupRequestLister + backupClient arkv1client.BackupsGetter + snapshotService cloudprovider.SnapshotService + backupService cloudprovider.BackupService + bucket string + restoreLister listers.RestoreLister + restoreClient arkv1client.RestoresGetter + + processRequestFunc func(*v1.DeleteBackupRequest) error +} + +// NewBackupDeletionController creates a new backup deletion controller. +func NewBackupDeletionController( + logger logrus.FieldLogger, + deleteBackupRequestInformer informers.DeleteBackupRequestInformer, + deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter, + backupClient arkv1client.BackupsGetter, + snapshotService cloudprovider.SnapshotService, + backupService cloudprovider.BackupService, + bucket string, + restoreInformer informers.RestoreInformer, + restoreClient arkv1client.RestoresGetter, +) Interface { + c := &backupDeletionController{ + genericController: newGenericController("backup-deletion", logger), + deleteBackupRequestClient: deleteBackupRequestClient, + deleteBackupRequestLister: deleteBackupRequestInformer.Lister(), + backupClient: backupClient, + snapshotService: snapshotService, + backupService: backupService, + bucket: bucket, + restoreLister: restoreInformer.Lister(), + restoreClient: restoreClient, + } + + c.syncHandler = c.processQueueItem + c.cacheSyncWaiters = append(c.cacheSyncWaiters, deleteBackupRequestInformer.Informer().HasSynced, restoreInformer.Informer().HasSynced) + c.processRequestFunc = c.processRequest + + deleteBackupRequestInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueue, + UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, + }, + ) + + return c +} + +func (c *backupDeletionController) processQueueItem(key string) error { + log := c.logger.WithField("key", key) + log.Debug("Running processItem") + + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return errors.Wrap(err, "error splitting queue key") + } + + req, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).Get(name) + if apierrors.IsNotFound(err) { + log.Debug("Unable to find DeleteBackupRequest") + return nil + } + if err != nil { + return errors.Wrap(err, "error getting DeleteBackupRequest") + } + + switch req.Status.Phase { + case v1.DeleteBackupRequestPhaseProcessed: + // Don't do anything because it's already been processed + default: + // Don't mutate the shared cache + reqCopy := req.DeepCopy() + return c.processRequestFunc(reqCopy) + } + + return nil +} + +func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) error { + log := c.logger.WithFields(logrus.Fields{ + "namespace": req.Namespace, + "name": req.Name, + "backup": req.Spec.BackupName, + }) + + var err error + + // Update status to InProgress + req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) { + r.Status.Phase = v1.DeleteBackupRequestPhaseInProgress + }) + if err != nil { + return err + } + + // Get the backup we're trying to delete + backup, err := c.backupClient.Backups(req.Namespace).Get(req.Spec.BackupName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + // Couldn't find backup - update status to Processed and record the not-found error + req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) { + r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed + r.Status.Errors = []string{"backup not found"} + }) + + return err + } + if err != nil { + return errors.Wrap(err, "error getting Backup") + } + + // If the backup includes snapshots but we don't currently have a PVProvider, we don't + // want to orphan the snapshots so skip deletion. + if c.snapshotService == nil && len(backup.Status.VolumeBackups) > 0 { + req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) { + r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed + r.Status.Errors = []string{"unable to delete backup because it includes PV snapshots and Ark is not configured with a PersistentVolumeProvider"} + }) + + return err + } + + // Set backup status to Deleting + backup, err = c.patchBackup(backup, func(b *v1.Backup) { + b.Status.Phase = v1.BackupPhaseDeleting + }) + if err != nil { + log.WithError(errors.WithStack(err)).Error("Error setting backup phase to deleting") + } + + var errs []string + + // Try to delete snapshots + log.Info("Removing PV snapshots") + for _, volumeBackup := range backup.Status.VolumeBackups { + log.WithField("snapshotID", volumeBackup.SnapshotID).Info("Removing snapshot associated with backup") + if err := c.snapshotService.DeleteSnapshot(volumeBackup.SnapshotID); err != nil { + errs = append(errs, errors.Wrapf(err, "error deleting snapshot %s", volumeBackup.SnapshotID).Error()) + } + } + + // Try to delete backup from object storage + log.Info("Removing backup from object storage") + if err := c.backupService.DeleteBackupDir(c.bucket, backup.Name); err != nil { + errs = append(errs, errors.Wrap(err, "error deleting backup from object storage").Error()) + } + + // Try to delete restores + log.Info("Removing restores") + if restores, err := c.restoreLister.Restores(backup.Namespace).List(labels.Everything()); err != nil { + log.WithError(errors.WithStack(err)).Error("Error listing restore API objects") + } else { + for _, restore := range restores { + if restore.Spec.BackupName != backup.Name { + continue + } + + restoreLog := log.WithField("restore", kube.NamespaceAndName(restore)) + + restoreLog.Info("Deleting restore referencing backup") + if err := c.restoreClient.Restores(restore.Namespace).Delete(restore.Name, &metav1.DeleteOptions{}); err != nil { + errs = append(errs, errors.Wrapf(err, "error deleting restore %s", kube.NamespaceAndName(restore)).Error()) + } + } + } + + if len(errs) == 0 { + // Only try to delete the backup object from kube if everything preceding went smoothly + err = c.backupClient.Backups(backup.Namespace).Delete(backup.Name, nil) + if err != nil { + errs = append(errs, errors.Wrapf(err, "error deleting backup %s", kube.NamespaceAndName(backup)).Error()) + } + } + + // Update status to processed and record errors + req, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) { + r.Status.Phase = v1.DeleteBackupRequestPhaseProcessed + r.Status.Errors = errs + }) + if err != nil { + return err + } + + // Everything deleted correctly, so we can delete all DeleteBackupRequests for this backup + if len(errs) == 0 { + listOptions := pkgbackup.NewDeleteBackupRequestListOptions(backup.Name) + err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).DeleteCollection(nil, listOptions) + if err != nil { + // If this errors, all we can do is log it. + c.logger.WithField("backup", kube.NamespaceAndName(backup)).Error("error deleting all associated DeleteBackupRequests after successfully deleting the backup") + } + } + + return nil +} + +func (c *backupDeletionController) patchDeleteBackupRequest(req *v1.DeleteBackupRequest, mutate func(*v1.DeleteBackupRequest)) (*v1.DeleteBackupRequest, error) { + // Record original json + oldData, err := json.Marshal(req) + if err != nil { + return nil, errors.Wrap(err, "error marshalling original DeleteBackupRequest") + } + + // Mutate + mutate(req) + + // Record new json + newData, err := json.Marshal(req) + if err != nil { + return nil, errors.Wrap(err, "error marshalling updated DeleteBackupRequest") + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return nil, errors.Wrap(err, "error creating json merge patch for DeleteBackupRequest") + } + + req, err = c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Patch(req.Name, types.MergePatchType, patchBytes) + if err != nil { + return nil, errors.Wrap(err, "error patching DeleteBackupRequest") + } + + return req, nil +} + +func (c *backupDeletionController) patchBackup(backup *v1.Backup, mutate func(*v1.Backup)) (*v1.Backup, error) { + // Record original json + oldData, err := json.Marshal(backup) + if err != nil { + return nil, errors.Wrap(err, "error marshalling original Backup") + } + + // Mutate + mutate(backup) + + // Record new json + newData, err := json.Marshal(backup) + if err != nil { + return nil, errors.Wrap(err, "error marshalling updated Backup") + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return nil, errors.Wrap(err, "error creating json merge patch for Backup") + } + + backup, err = c.backupClient.Backups(backup.Namespace).Patch(backup.Name, types.MergePatchType, patchBytes) + if err != nil { + return nil, errors.Wrap(err, "error patching Backup") + } + + return backup, nil +} diff --git a/pkg/controller/backup_deletion_controller_test.go b/pkg/controller/backup_deletion_controller_test.go new file mode 100644 index 000000000..be243da4e --- /dev/null +++ b/pkg/controller/backup_deletion_controller_test.go @@ -0,0 +1,391 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/heptio/ark/pkg/apis/ark/v1" + pkgbackup "github.com/heptio/ark/pkg/backup" + "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/util/kube" + arktest "github.com/heptio/ark/pkg/util/test" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" + core "k8s.io/client-go/testing" +) + +func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) { + req := pkgbackup.NewDeleteBackupRequest("foo") + req.Namespace = "heptio-ark" + expected := kube.NamespaceAndName(req) + + client := fake.NewSimpleClientset(req) + + fakeWatch := watch.NewFake() + defer fakeWatch.Stop() + client.PrependWatchReactor("deletebackuprequests", core.DefaultWatchReactor(fakeWatch, nil)) + + sharedInformers := informers.NewSharedInformerFactory(client, 0) + + controller := NewBackupDeletionController( + arktest.NewLogger(), + sharedInformers.Ark().V1().DeleteBackupRequests(), + client.ArkV1(), // deleteBackupRequestClient + client.ArkV1(), // backupClient + nil, // snapshotService + nil, // backupService + "bucket", + sharedInformers.Ark().V1().Restores(), + client.ArkV1(), // restoreClient + ).(*backupDeletionController) + + keys := make(chan string) + + controller.syncHandler = func(key string) error { + keys <- key + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + go sharedInformers.Start(ctx.Done()) + go controller.Run(ctx, 1) + + // wait for the AddFunc + select { + case <-ctx.Done(): + t.Fatal("test timed out waiting for AddFunc") + case key := <-keys: + assert.Equal(t, expected, key) + } + + req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed + fakeWatch.Add(req) + + // wait for the UpdateFunc + select { + case <-ctx.Done(): + t.Fatal("test timed out waiting for UpdateFunc") + case key := <-keys: + assert.Equal(t, expected, key) + } +} + +func TestBackupDeletionControllerProcessQueueItem(t *testing.T) { + client := fake.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(client, 0) + + controller := NewBackupDeletionController( + arktest.NewLogger(), + sharedInformers.Ark().V1().DeleteBackupRequests(), + client.ArkV1(), // deleteBackupRequestClient + client.ArkV1(), // backupClient + nil, // snapshotService + nil, // backupService + "bucket", + sharedInformers.Ark().V1().Restores(), + client.ArkV1(), // restoreClient + ).(*backupDeletionController) + + // Error splitting key + err := controller.processQueueItem("foo/bar/baz") + assert.Error(t, err) + + // Can't find DeleteBackupRequest + err = controller.processQueueItem("foo/bar") + assert.NoError(t, err) + + // Already processed + req := pkgbackup.NewDeleteBackupRequest("foo") + req.Namespace = "foo" + req.Name = "foo-abcde" + req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed + + err = controller.processQueueItem("foo/bar") + assert.NoError(t, err) + + // Invoke processRequestFunc + for _, phase := range []v1.DeleteBackupRequestPhase{"", v1.DeleteBackupRequestPhaseNew, v1.DeleteBackupRequestPhaseInProgress} { + t.Run(fmt.Sprintf("phase=%s", phase), func(t *testing.T) { + req.Status.Phase = phase + sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(req) + + var errorToReturn error + var actual *v1.DeleteBackupRequest + var called bool + controller.processRequestFunc = func(r *v1.DeleteBackupRequest) error { + called = true + actual = r + return errorToReturn + } + + // No error + err = controller.processQueueItem("foo/foo-abcde") + require.True(t, called, "processRequestFunc wasn't called") + assert.Equal(t, err, errorToReturn) + assert.Equal(t, req, actual) + + // Error + errorToReturn = errors.New("bar") + err = controller.processQueueItem("foo/foo-abcde") + require.True(t, called, "processRequestFunc wasn't called") + assert.Equal(t, err, errorToReturn) + }) + } +} + +type backupDeletionControllerTestData struct { + client *fake.Clientset + sharedInformers informers.SharedInformerFactory + backupService *arktest.BackupService + snapshotService *arktest.FakeSnapshotService + controller *backupDeletionController + req *v1.DeleteBackupRequest +} + +func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletionControllerTestData { + client := fake.NewSimpleClientset(objects...) + sharedInformers := informers.NewSharedInformerFactory(client, 0) + backupService := &arktest.BackupService{} + snapshotService := &arktest.FakeSnapshotService{SnapshotsTaken: sets.NewString()} + req := pkgbackup.NewDeleteBackupRequest("foo") + + data := &backupDeletionControllerTestData{ + client: client, + sharedInformers: sharedInformers, + backupService: backupService, + snapshotService: snapshotService, + controller: NewBackupDeletionController( + arktest.NewLogger(), + sharedInformers.Ark().V1().DeleteBackupRequests(), + client.ArkV1(), // deleteBackupRequestClient + client.ArkV1(), // backupClient + snapshotService, + backupService, + "bucket", + sharedInformers.Ark().V1().Restores(), + client.ArkV1(), // restoreClient + ).(*backupDeletionController), + + req: req, + } + req.Namespace = "heptio-ark" + req.Name = "foo-abcde" + + return data +} + +func TestBackupDeletionControllerProcessRequest(t *testing.T) { + t.Run("patching to InProgress fails", func(t *testing.T) { + td := setupBackupDeletionControllerTest() + defer td.backupService.AssertExpectations(t) + + td.client.PrependReactor("patch", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("bad") + }) + + err := td.controller.processRequest(td.req) + assert.EqualError(t, err, "error patching DeleteBackupRequest: bad") + }) + + t.Run("unable to find backup", func(t *testing.T) { + td := setupBackupDeletionControllerTest() + defer td.backupService.AssertExpectations(t) + + td.client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(v1.SchemeGroupVersion.WithResource("backups").GroupResource(), "foo") + }) + + td.client.PrependReactor("patch", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { + return true, td.req, nil + }) + + err := td.controller.processRequest(td.req) + require.NoError(t, err) + + expectedActions := []core.Action{ + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + td.req.Name, + []byte(`{"status":{"phase":"InProgress"}}`), + ), + core.NewGetAction( + v1.SchemeGroupVersion.WithResource("backups"), + td.req.Namespace, + td.req.Spec.BackupName, + ), + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + td.req.Name, + []byte(`{"status":{"errors":["backup not found"],"phase":"Processed"}}`), + ), + } + + assert.Equal(t, expectedActions, td.client.Actions()) + }) + + t.Run("no snapshot service, backup has snapshots", func(t *testing.T) { + td := setupBackupDeletionControllerTest() + td.controller.snapshotService = nil + defer td.backupService.AssertExpectations(t) + + td.client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) { + backup := arktest.NewTestBackup().WithName("backup-1").WithSnapshot("pv-1", "snap-1").Backup + return true, backup, nil + }) + + td.client.PrependReactor("patch", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { + return true, td.req, nil + }) + + err := td.controller.processRequest(td.req) + require.NoError(t, err) + + expectedActions := []core.Action{ + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + td.req.Name, + []byte(`{"status":{"phase":"InProgress"}}`), + ), + core.NewGetAction( + v1.SchemeGroupVersion.WithResource("backups"), + td.req.Namespace, + td.req.Spec.BackupName, + ), + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + td.req.Name, + []byte(`{"status":{"errors":["unable to delete backup because it includes PV snapshots and Ark is not configured with a PersistentVolumeProvider"],"phase":"Processed"}}`), + ), + } + + assert.Equal(t, expectedActions, td.client.Actions()) + }) + + t.Run("full delete, no errors", func(t *testing.T) { + backup := arktest.NewTestBackup().WithName("foo").WithSnapshot("pv-1", "snap-1").Backup + + restore1 := arktest.NewTestRestore("heptio-ark", "restore-1", v1.RestorePhaseCompleted).WithBackup("foo").Restore + restore2 := arktest.NewTestRestore("heptio-ark", "restore-2", v1.RestorePhaseCompleted).WithBackup("foo").Restore + restore3 := arktest.NewTestRestore("heptio-ark", "restore-3", v1.RestorePhaseCompleted).WithBackup("some-other-backup").Restore + + td := setupBackupDeletionControllerTest(backup, restore1, restore2, restore3) + + td.sharedInformers.Ark().V1().Restores().Informer().GetStore().Add(restore1) + td.sharedInformers.Ark().V1().Restores().Informer().GetStore().Add(restore2) + td.sharedInformers.Ark().V1().Restores().Informer().GetStore().Add(restore3) + + defer td.backupService.AssertExpectations(t) + + td.client.PrependReactor("get", "backups", func(action core.Action) (bool, runtime.Object, error) { + return true, backup, nil + }) + td.snapshotService.SnapshotsTaken.Insert("snap-1") + + td.client.PrependReactor("patch", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { + return true, td.req, nil + }) + + td.client.PrependReactor("patch", "backups", func(action core.Action) (bool, runtime.Object, error) { + return true, backup, nil + }) + + td.backupService.On("DeleteBackupDir", td.controller.bucket, td.req.Spec.BackupName).Return(nil) + + err := td.controller.processRequest(td.req) + require.NoError(t, err) + + expectedActions := []core.Action{ + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + td.req.Name, + []byte(`{"status":{"phase":"InProgress"}}`), + ), + core.NewGetAction( + v1.SchemeGroupVersion.WithResource("backups"), + td.req.Namespace, + td.req.Spec.BackupName, + ), + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("backups"), + td.req.Namespace, + td.req.Spec.BackupName, + []byte(`{"status":{"phase":"Deleting"}}`), + ), + core.NewDeleteAction( + v1.SchemeGroupVersion.WithResource("restores"), + td.req.Namespace, + "restore-1", + ), + core.NewDeleteAction( + v1.SchemeGroupVersion.WithResource("restores"), + td.req.Namespace, + "restore-2", + ), + core.NewDeleteAction( + v1.SchemeGroupVersion.WithResource("backups"), + td.req.Namespace, + td.req.Spec.BackupName, + ), + core.NewPatchAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + td.req.Name, + []byte(`{"status":{"phase":"Processed"}}`), + ), + core.NewDeleteCollectionAction( + v1.SchemeGroupVersion.WithResource("deletebackuprequests"), + td.req.Namespace, + pkgbackup.NewDeleteBackupRequestListOptions(td.req.Spec.BackupName), + ), + } + + assert.Len(t, td.client.Actions(), len(expectedActions)) + for _, e := range expectedActions { + found := false + for _, a := range td.client.Actions() { + if reflect.DeepEqual(e, a) { + found = true + break + } + } + if !found { + t.Errorf("missing expected action %#v", e) + } + } + + // Make sure snapshot was deleted + assert.Equal(t, 0, td.snapshotService.SnapshotsTaken.Len()) + }) +} diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index 8f3e7165e..586e4799e 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -17,65 +17,40 @@ limitations under the License. package controller import ( - "context" - "encoding/json" "time" + pkgbackup "github.com/heptio/ark/pkg/backup" "github.com/pkg/errors" "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/util/version" - api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" - "github.com/heptio/ark/pkg/util/kube" - "github.com/heptio/ark/pkg/util/stringslice" ) -// MinVersionForDelete is the minimum Kubernetes server version that Ark -// requires in order to be able to properly delete backups (including -// the associated snapshots and object storage files). This is because -// Ark uses finalizers on the backup CRD to implement garbage-collection -// and deletion. -var MinVersionForDelete = version.MustParseSemantic("1.7.5") - -// gcController removes expired backup content from object storage. +// gcController creates DeleteBackupRequests for expired backups. type gcController struct { - backupService cloudprovider.BackupService - snapshotService cloudprovider.SnapshotService - bucket string - syncPeriod time.Duration - clock clock.Clock - backupLister listers.BackupLister - backupListerSynced cache.InformerSynced - backupClient arkv1client.BackupsGetter - restoreLister listers.RestoreLister - restoreListerSynced cache.InformerSynced - restoreClient arkv1client.RestoresGetter - logger logrus.FieldLogger + *genericController + + logger logrus.FieldLogger + backupLister listers.BackupLister + deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter + syncPeriod time.Duration + + clock clock.Clock } // NewGCController constructs a new gcController. func NewGCController( - backupService cloudprovider.BackupService, - snapshotService cloudprovider.SnapshotService, - bucket string, - syncPeriod time.Duration, - backupInformer informers.BackupInformer, - backupClient arkv1client.BackupsGetter, - restoreInformer informers.RestoreInformer, - restoreClient arkv1client.RestoresGetter, logger logrus.FieldLogger, + backupInformer informers.BackupInformer, + deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter, + syncPeriod time.Duration, ) Interface { if syncPeriod < time.Minute { logger.WithField("syncPeriod", syncPeriod).Info("Provided GC sync period is too short. Setting to 1 minute") @@ -83,161 +58,86 @@ func NewGCController( } c := &gcController{ - backupService: backupService, - snapshotService: snapshotService, - bucket: bucket, - syncPeriod: syncPeriod, - clock: clock.RealClock{}, - backupLister: backupInformer.Lister(), - backupListerSynced: backupInformer.Informer().HasSynced, - backupClient: backupClient, - restoreLister: restoreInformer.Lister(), - restoreListerSynced: restoreInformer.Informer().HasSynced, - restoreClient: restoreClient, - logger: logger, + genericController: newGenericController("gc-controller", logger), + syncPeriod: syncPeriod, + clock: clock.RealClock{}, + backupLister: backupInformer.Lister(), + deleteBackupRequestClient: deleteBackupRequestClient, + logger: logger, } + c.syncHandler = c.processQueueItem + c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced) + + c.resyncPeriod = syncPeriod + c.resyncFunc = c.enqueueAllBackups + backupInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: c.handleFinalizer, - UpdateFunc: func(_, upd interface{}) { - c.handleFinalizer(upd) - }, + AddFunc: c.enqueue, + UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, }, ) return c } -// handleFinalizer runs garbage-collection on a backup that has the Ark GC -// finalizer and a deletionTimestamp. -func (c *gcController) handleFinalizer(obj interface{}) { - var ( - backup = obj.(*api.Backup) - log = c.logger.WithField("backup", kube.NamespaceAndName(backup)) - ) +// enqueueAllBackups lists all backups from cache and enqueues all of them so we can check each one +// for expiration. +func (c *gcController) enqueueAllBackups() { + c.logger.Debug("gcController.enqueueAllBackups") - // we're only interested in backups that have a deletionTimestamp and at - // least one finalizer. - if backup.DeletionTimestamp == nil || len(backup.Finalizers) == 0 { - return - } - log.Debugf("Backup has finalizers %s", backup.Finalizers) - - if !stringslice.Has(backup.Finalizers, api.GCFinalizer) { - return - } - - log.Infof("Garbage-collecting backup") - if err := c.garbageCollect(backup, log); err != nil { - // if there were errors deleting related cloud resources, don't - // delete the backup API object because we don't want to orphan - // the cloud resources. - log.WithError(err).Error("Error deleting backup's related objects") - return - } - - patchMap := map[string]interface{}{ - "metadata": map[string]interface{}{ - "finalizers": stringslice.Except(backup.Finalizers, api.GCFinalizer), - "resourceVersion": backup.ResourceVersion, - }, - } - - patchBytes, err := json.Marshal(patchMap) - if err != nil { - log.WithError(err).Error("Error marshaling finalizers patch") - return - } - - if _, err = c.backupClient.Backups(backup.Namespace).Patch(backup.Name, types.MergePatchType, patchBytes); err != nil { - log.WithError(errors.WithStack(err)).Error("Error patching backup") - } -} - -// Run is a blocking function that runs a single worker to garbage-collect backups -// from object/block storage and the Ark API. It will return when it receives on the -// ctx.Done() channel. -func (c *gcController) Run(ctx context.Context, workers int) error { - c.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced) { - return errors.New("timed out waiting for caches to sync") - } - c.logger.Info("Caches are synced") - - wait.Until(c.run, c.syncPeriod, ctx.Done()) - return nil -} - -func (c *gcController) run() { - now := c.clock.Now() - c.logger.Info("Garbage-collecting expired backups") - - // Go thru API objects and delete expired ones (finalizer will GC their - // corresponding files/snapshots/restores). Note that we're ignoring backups - // in object storage that haven't been synced to Kubernetes yet; they'll - // be processed for GC (if applicable) once they've been synced. backups, err := c.backupLister.List(labels.Everything()) if err != nil { - c.logger.WithError(errors.WithStack(err)).Error("Error getting all backups") + c.logger.WithError(errors.WithStack(err)).Error("error listing backups") return } for _, backup := range backups { - log := c.logger.WithField("backup", kube.NamespaceAndName(backup)) - if backup.Status.Expiration.Time.After(now) { - log.Debug("Backup has not expired yet, skipping") - continue - } - - // since backups have a finalizer, this will actually have the effect of setting a deletionTimestamp and calling - // an update. The update will be handled by this controller and will result in a deletion of the obj storage - // files and the API object. - if err := c.backupClient.Backups(backup.Namespace).Delete(backup.Name, &metav1.DeleteOptions{}); err != nil { - log.WithError(errors.WithStack(err)).Error("Error deleting backup") - } + c.enqueue(backup) } } -// garbageCollect prepares for deleting an expired backup by deleting any -// associated backup files, volume snapshots, or restore API objects. -func (c *gcController) garbageCollect(backup *api.Backup, log logrus.FieldLogger) error { - // if the backup includes snapshots but we don't currently have a PVProvider, we don't - // want to orphan the snapshots so skip garbage-collection entirely. - if c.snapshotService == nil && len(backup.Status.VolumeBackups) > 0 { - return errors.New("cannot garbage-collect backup because it includes snapshots and Ark is not configured with a PersistentVolumeProvider") +func (c *gcController) processQueueItem(key string) error { + log := c.logger.WithField("backup", key) + + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return errors.Wrap(err, "error splitting queue key") } - var errs []error - - for _, volumeBackup := range backup.Status.VolumeBackups { - log.WithField("snapshotID", volumeBackup.SnapshotID).Info("Removing snapshot associated with backup") - if err := c.snapshotService.DeleteSnapshot(volumeBackup.SnapshotID); err != nil { - errs = append(errs, errors.Wrapf(err, "error deleting snapshot %s", volumeBackup.SnapshotID)) - } + backup, err := c.backupLister.Backups(ns).Get(name) + if apierrors.IsNotFound(err) { + log.Debug("Unable to find backup") + return nil + } + if err != nil { + return errors.Wrap(err, "error getting backup") } - log.Info("Removing backup from object storage") - if err := c.backupService.DeleteBackupDir(c.bucket, backup.Name); err != nil { - errs = append(errs, errors.Wrap(err, "error deleting backup from object storage")) + log = c.logger.WithFields( + logrus.Fields{ + "backup": key, + "expiration": backup.Status.Expiration.Time, + }, + ) + + now := c.clock.Now() + + expiration := backup.Status.Expiration.Time + if expiration.IsZero() || expiration.After(now) { + log.Debug("Backup has not expired yet, skipping") + return nil } - if restores, err := c.restoreLister.Restores(backup.Namespace).List(labels.Everything()); err != nil { - log.WithError(errors.WithStack(err)).Error("Error listing restore API objects") - } else { - for _, restore := range restores { - if restore.Spec.BackupName != backup.Name { - continue - } + log.Info("Backup has expired. Creating a DeleteBackupRequest.") - restoreLog := log.WithField("restore", kube.NamespaceAndName(restore)) + req := pkgbackup.NewDeleteBackupRequest(name) - restoreLog.Info("Deleting restore referencing backup") - if err := c.restoreClient.Restores(restore.Namespace).Delete(restore.Name, &metav1.DeleteOptions{}); err != nil { - restoreLog.WithError(errors.WithStack(err)).Error("Error deleting restore") - } - } + _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req) + if err != nil { + return errors.Wrap(err, "error creating DeleteBackupRequest") } - return kerrors.NewAggregate(errs) + return nil } diff --git a/pkg/controller/gc_controller_test.go b/pkg/controller/gc_controller_test.go index dd26a0ea5..bbd902214 100644 --- a/pkg/controller/gc_controller_test.go +++ b/pkg/controller/gc_controller_test.go @@ -17,72 +17,170 @@ limitations under the License. package controller import ( - "errors" + "context" + "fmt" + "sort" "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" core "k8s.io/client-go/testing" api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" + "github.com/heptio/ark/pkg/util/kube" arktest "github.com/heptio/ark/pkg/util/test" ) -func TestGCControllerRun(t *testing.T) { +func TestGCControllerEnqueueAllBackups(t *testing.T) { + var ( + client = fake.NewSimpleClientset() + sharedInformers = informers.NewSharedInformerFactory(client, 0) + + controller = NewGCController( + arktest.NewLogger(), + sharedInformers.Ark().V1().Backups(), + client.ArkV1(), + 1*time.Millisecond, + ).(*gcController) + ) + + // Have to clear this out so the controller doesn't wait + controller.cacheSyncWaiters = nil + + keys := make(chan string) + + controller.syncHandler = func(key string) error { + keys <- key + return nil + } + + var expected []string + + for i := 0; i < 3; i++ { + backup := arktest.NewTestBackup().WithName(fmt.Sprintf("backup-%d", i)).Backup + sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup) + expected = append(expected, kube.NamespaceAndName(backup)) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + go controller.Run(ctx, 1) + + var received []string + +Loop: + for { + select { + case <-ctx.Done(): + t.Fatal("test timed out") + case key := <-keys: + received = append(received, key) + if len(received) == len(expected) { + break Loop + } + } + } + + sort.Strings(expected) + sort.Strings(received) + assert.Equal(t, expected, received) +} + +func TestGCControllerHasUpdateFunc(t *testing.T) { + backup := arktest.NewTestBackup().WithName("backup").Backup + expected := kube.NamespaceAndName(backup) + + client := fake.NewSimpleClientset(backup) + + fakeWatch := watch.NewFake() + defer fakeWatch.Stop() + client.PrependWatchReactor("backups", core.DefaultWatchReactor(fakeWatch, nil)) + + sharedInformers := informers.NewSharedInformerFactory(client, 0) + + controller := NewGCController( + arktest.NewLogger(), + sharedInformers.Ark().V1().Backups(), + client.ArkV1(), + 1*time.Millisecond, + ).(*gcController) + + keys := make(chan string) + + controller.syncHandler = func(key string) error { + keys <- key + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + go sharedInformers.Start(ctx.Done()) + go controller.Run(ctx, 1) + + // wait for the AddFunc + select { + case <-ctx.Done(): + t.Fatal("test timed out waiting for AddFunc") + case key := <-keys: + assert.Equal(t, expected, key) + } + + backup.Status.Version = 1234 + fakeWatch.Add(backup) + + // wait for the UpdateFunc + select { + case <-ctx.Done(): + t.Fatal("test timed out waiting for UpdateFunc") + case key := <-keys: + assert.Equal(t, expected, key) + } +} + +func TestGCControllerProcessQueueItem(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) tests := []struct { - name string - backups []*api.Backup - snapshots sets.String - expectedDeletions sets.String + name string + backup *api.Backup + expectDeletion bool + createDeleteBackupRequestError bool + expectError bool }{ { - name: "no backups results in no deletions", + name: "can't find backup - no error", }, { name: "expired backup is deleted", - backups: []*api.Backup{ - arktest.NewTestBackup().WithName("backup-1"). - WithExpiration(fakeClock.Now().Add(-1*time.Second)). - WithSnapshot("pv-1", "snapshot-1"). - WithSnapshot("pv-2", "snapshot-2"). - Backup, - }, - expectedDeletions: sets.NewString("backup-1"), + backup: arktest.NewTestBackup().WithName("backup-1"). + WithExpiration(fakeClock.Now().Add(-1 * time.Second)). + Backup, + expectDeletion: true, }, { name: "unexpired backup is not deleted", - backups: []*api.Backup{ - arktest.NewTestBackup().WithName("backup-1"). - WithExpiration(fakeClock.Now().Add(1*time.Minute)). - WithSnapshot("pv-1", "snapshot-1"). - WithSnapshot("pv-2", "snapshot-2"). - Backup, - }, - expectedDeletions: sets.NewString(), + backup: arktest.NewTestBackup().WithName("backup-1"). + WithExpiration(fakeClock.Now().Add(1 * time.Minute)). + Backup, + expectDeletion: false, }, { - name: "expired backup is deleted and unexpired backup is not deleted", - backups: []*api.Backup{ - arktest.NewTestBackup().WithName("backup-1"). - WithExpiration(fakeClock.Now().Add(-1*time.Minute)). - WithSnapshot("pv-1", "snapshot-1"). - WithSnapshot("pv-2", "snapshot-2"). - Backup, - arktest.NewTestBackup().WithName("backup-2"). - WithExpiration(fakeClock.Now().Add(1*time.Minute)). - WithSnapshot("pv-3", "snapshot-3"). - WithSnapshot("pv-4", "snapshot-4"). - Backup, - }, - expectedDeletions: sets.NewString("backup-1"), + name: "create DeleteBackupRequest error returns an error", + backup: arktest.NewTestBackup().WithName("backup-1"). + WithExpiration(fakeClock.Now().Add(-1 * time.Second)). + Backup, + expectDeletion: true, + createDeleteBackupRequestError: true, + expectError: true, }, } @@ -94,277 +192,34 @@ func TestGCControllerRun(t *testing.T) { ) controller := NewGCController( - nil, - nil, - "bucket", - 1*time.Millisecond, + arktest.NewLogger(), sharedInformers.Ark().V1().Backups(), client.ArkV1(), - sharedInformers.Ark().V1().Restores(), - client.ArkV1(), - arktest.NewLogger(), + 1*time.Millisecond, ).(*gcController) controller.clock = fakeClock - for _, backup := range test.backups { - sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup) + var key string + if test.backup != nil { + key = kube.NamespaceAndName(test.backup) + sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup) } - expectedDeletions := make([]core.Action, 0, len(test.expectedDeletions)) - for backup := range test.expectedDeletions { - expectedDeletions = append(expectedDeletions, core.NewDeleteAction( - api.SchemeGroupVersion.WithResource("backups"), - api.DefaultNamespace, - backup, - )) + if test.createDeleteBackupRequestError { + client.PrependReactor("create", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("foo") + }) } - controller.run() + err := controller.processQueueItem(key) + gotErr := err != nil + assert.Equal(t, test.expectError, gotErr) - assert.Equal(t, expectedDeletions, client.Actions()) - }) - } -} - -func TestGarbageCollectBackup(t *testing.T) { - tests := []struct { - name string - backup *api.Backup - snapshots sets.String - restores []*api.Restore - nilSnapshotService bool - expectErr bool - expectBackupDirDeleted bool - }{ - { - name: "nil snapshot service when backup has snapshots returns error", - backup: arktest.NewTestBackup().WithName("backup-1").WithSnapshot("pv-1", "snap-1").Backup, - nilSnapshotService: true, - expectErr: true, - }, - { - name: "nil snapshot service when backup doesn't have snapshots correctly garbage-collects", - backup: arktest.NewTestBackup().WithName("backup-1").Backup, - nilSnapshotService: true, - expectBackupDirDeleted: true, - }, - { - name: "return error if snapshot deletion fails", - backup: arktest.NewTestBackup().WithName("backup-1"). - WithSnapshot("pv-1", "snapshot-1"). - WithSnapshot("pv-2", "snapshot-2"). - Backup, - snapshots: sets.NewString("snapshot-1"), - expectBackupDirDeleted: true, - expectErr: true, - }, - { - name: "related restores should be deleted", - backup: arktest.NewTestBackup().WithName("backup-1").Backup, - snapshots: sets.NewString(), - restores: []*api.Restore{ - arktest.NewTestRestore(api.DefaultNamespace, "restore-1", api.RestorePhaseCompleted).WithBackup("backup-1").Restore, - arktest.NewTestRestore(api.DefaultNamespace, "restore-2", api.RestorePhaseCompleted).WithBackup("backup-2").Restore, - }, - expectBackupDirDeleted: true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - backupService = &arktest.BackupService{} - snapshotService = &arktest.FakeSnapshotService{SnapshotsTaken: test.snapshots} - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - controller = NewGCController( - backupService, - snapshotService, - "bucket-1", - 1*time.Millisecond, - sharedInformers.Ark().V1().Backups(), - client.ArkV1(), - sharedInformers.Ark().V1().Restores(), - client.ArkV1(), - arktest.NewLogger(), - ).(*gcController) - ) - - if test.nilSnapshotService { - controller.snapshotService = nil - } - - sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup) - for _, restore := range test.restores { - sharedInformers.Ark().V1().Restores().Informer().GetStore().Add(restore) - } - - if test.expectBackupDirDeleted { - backupService.On("DeleteBackupDir", controller.bucket, test.backup.Name).Return(nil) - } - - // METHOD UNDER TEST - err := controller.garbageCollect(test.backup, controller.logger) - - // VERIFY: - - // error - assert.Equal(t, test.expectErr, err != nil) - - // remaining snapshots - if !test.nilSnapshotService { - backupSnapshots := sets.NewString() - for _, snapshot := range test.backup.Status.VolumeBackups { - backupSnapshots.Insert(snapshot.SnapshotID) - } - - assert.Equal(t, test.snapshots.Difference(backupSnapshots), snapshotService.SnapshotsTaken) - } - - // restore client deletes - expectedActions := make([]core.Action, 0) - for _, restore := range test.restores { - if restore.Spec.BackupName != test.backup.Name { - continue - } - - action := core.NewDeleteAction( - api.SchemeGroupVersion.WithResource("restores"), - api.DefaultNamespace, - restore.Name, - ) - expectedActions = append(expectedActions, action) - } - assert.Equal(t, expectedActions, client.Actions()) - - // backup dir deletion - backupService.AssertExpectations(t) - }) - } -} - -func TestGarbageCollectPicksUpBackupUponExpiration(t *testing.T) { - var ( - fakeClock = clock.NewFakeClock(time.Now()) - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - backup = arktest.NewTestBackup().WithName("backup-1"). - WithExpiration(fakeClock.Now().Add(1*time.Second)). - WithSnapshot("pv-1", "snapshot-1"). - WithSnapshot("pv-2", "snapshot-2"). - Backup - ) - - controller := NewGCController( - nil, - nil, - "bucket", - 1*time.Millisecond, - sharedInformers.Ark().V1().Backups(), - client.ArkV1(), - sharedInformers.Ark().V1().Restores(), - client.ArkV1(), - arktest.NewLogger(), - ).(*gcController) - controller.clock = fakeClock - - sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(backup) - - // PASS 1 - controller.run() - assert.Equal(t, 0, len(client.Actions())) - - // PASS 2 - expectedActions := []core.Action{ - core.NewDeleteAction( - api.SchemeGroupVersion.WithResource("backups"), - api.DefaultNamespace, - "backup-1", - ), - } - - fakeClock.Step(1 * time.Minute) - controller.run() - - assert.Equal(t, expectedActions, client.Actions()) -} - -func TestHandleFinalizer(t *testing.T) { - tests := []struct { - name string - backup *api.Backup - deleteBackupDirError bool - expectGarbageCollect bool - expectedPatch []byte - }{ - { - name: "nil deletionTimestamp exits early", - backup: arktest.NewTestBackup().Backup, - }, - { - name: "no finalizers exits early", - backup: arktest.NewTestBackup().WithDeletionTimestamp(time.Now()).Backup, - }, - { - name: "no GCFinalizer exits early", - backup: arktest.NewTestBackup().WithDeletionTimestamp(time.Now()).WithFinalizers("foo").Backup, - }, - { - name: "error when calling garbageCollect exits without patch", - backup: arktest.NewTestBackup().WithDeletionTimestamp(time.Now()).WithFinalizers(api.GCFinalizer).Backup, - deleteBackupDirError: true, - }, - { - name: "normal case - patch includes the appropriate fields", - backup: arktest.NewTestBackup().WithDeletionTimestamp(time.Now()).WithFinalizers(api.GCFinalizer, "foo").WithResourceVersion("1").Backup, - expectGarbageCollect: true, - expectedPatch: []byte(`{"metadata":{"finalizers":["foo"],"resourceVersion":"1"}}`), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - var ( - backupService = &arktest.BackupService{} - client = fake.NewSimpleClientset() - sharedInformers = informers.NewSharedInformerFactory(client, 0) - controller = NewGCController( - backupService, - nil, - "bucket-1", - 1*time.Millisecond, - sharedInformers.Ark().V1().Backups(), - client.ArkV1(), - sharedInformers.Ark().V1().Restores(), - client.ArkV1(), - arktest.NewLogger(), - ).(*gcController) - ) - - if test.expectGarbageCollect { - backupService.On("DeleteBackupDir", controller.bucket, test.backup.Name).Return(nil) - } else if test.deleteBackupDirError { - backupService.On("DeleteBackupDir", controller.bucket, test.backup.Name).Return(errors.New("foo")) - } - - // METHOD UNDER TEST - controller.handleFinalizer(test.backup) - - // VERIFY - backupService.AssertExpectations(t) - - actions := client.Actions() - - if test.expectedPatch == nil { - assert.Equal(t, 0, len(actions)) - return - } - - require.Equal(t, 1, len(actions)) - patchAction, ok := actions[0].(core.PatchAction) - require.True(t, ok, "action is not a PatchAction") - - assert.Equal(t, test.expectedPatch, patchAction.GetPatch()) + if test.expectDeletion { + assert.Len(t, client.Actions(), 1) + } else { + assert.Len(t, client.Actions(), 0) + } }) } } diff --git a/pkg/controller/generic_controller.go b/pkg/controller/generic_controller.go new file mode 100644 index 000000000..c83dd55f0 --- /dev/null +++ b/pkg/controller/generic_controller.go @@ -0,0 +1,153 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type genericController struct { + name string + queue workqueue.RateLimitingInterface + logger logrus.FieldLogger + syncHandler func(key string) error + resyncFunc func() + resyncPeriod time.Duration + cacheSyncWaiters []cache.InformerSynced +} + +func newGenericController(name string, logger logrus.FieldLogger) *genericController { + c := &genericController{ + name: name, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + logger: logger.WithField("controller", name), + } + + return c +} + +// Run is a blocking function that runs the specified number of worker goroutines +// to process items in the work queue. It will return when it receives on the +// ctx.Done() channel. +func (c *genericController) Run(ctx context.Context, numWorkers int) error { + if c.syncHandler == nil { + // programmer error + panic("syncHandler is required") + } + + var wg sync.WaitGroup + + defer func() { + c.logger.Info("Waiting for workers to finish their work") + + c.queue.ShutDown() + + // We have to wait here in the deferred function instead of at the bottom of the function body + // because we have to shut down the queue in order for the workers to shut down gracefully, and + // we want to shut down the queue via defer and not at the end of the body. + wg.Wait() + + c.logger.Info("All workers have finished") + + }() + + c.logger.Info("Starting controller") + defer c.logger.Info("Shutting down controller") + + c.logger.Info("Waiting for caches to sync") + if !cache.WaitForCacheSync(ctx.Done(), c.cacheSyncWaiters...) { + return errors.New("timed out waiting for caches to sync") + } + c.logger.Info("Caches are synced") + + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func() { + wait.Until(c.runWorker, time.Second, ctx.Done()) + wg.Done() + }() + } + + if c.resyncFunc != nil { + if c.resyncPeriod == 0 { + // Programmer error + panic("non-zero resyncPeriod is required") + } + + wg.Add(1) + go func() { + wait.Until(c.resyncFunc, c.resyncPeriod, ctx.Done()) + wg.Done() + }() + } + + <-ctx.Done() + + return nil +} + +func (c *genericController) runWorker() { + // continually take items off the queue (waits if it's + // empty) until we get a shutdown signal from the queue + for c.processNextWorkItem() { + } +} + +func (c *genericController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + // always call done on this item, since if it fails we'll add + // it back with rate-limiting below + defer c.queue.Done(key) + + err := c.syncHandler(key.(string)) + if err == nil { + // If you had no error, tell the queue to stop tracking history for your key. This will reset + // things like failure counts for per-item rate limiting. + c.queue.Forget(key) + return true + } + + c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") + // we had an error processing the item so add it back + // into the queue for re-processing with rate-limiting + c.queue.AddRateLimited(key) + + return true +} + +func (c *genericController) enqueue(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + c.logger.WithError(errors.WithStack(err)). + Error("Error creating queue key, item not added to queue") + return + } + + c.queue.Add(key) +} diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go b/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go index 4921e71d7..fe95a374e 100644 --- a/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go +++ b/pkg/generated/clientset/versioned/typed/ark/v1/ark_client.go @@ -26,6 +26,7 @@ type ArkV1Interface interface { RESTClient() rest.Interface BackupsGetter ConfigsGetter + DeleteBackupRequestsGetter DownloadRequestsGetter RestoresGetter SchedulesGetter @@ -44,6 +45,10 @@ func (c *ArkV1Client) Configs(namespace string) ConfigInterface { return newConfigs(c, namespace) } +func (c *ArkV1Client) DeleteBackupRequests(namespace string) DeleteBackupRequestInterface { + return newDeleteBackupRequests(c, namespace) +} + func (c *ArkV1Client) DownloadRequests(namespace string) DownloadRequestInterface { return newDownloadRequests(c, namespace) } diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/deletebackuprequest.go b/pkg/generated/clientset/versioned/typed/ark/v1/deletebackuprequest.go new file mode 100644 index 000000000..a1730df05 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/ark/v1/deletebackuprequest.go @@ -0,0 +1,171 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package v1 + +import ( + v1 "github.com/heptio/ark/pkg/apis/ark/v1" + scheme "github.com/heptio/ark/pkg/generated/clientset/versioned/scheme" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// DeleteBackupRequestsGetter has a method to return a DeleteBackupRequestInterface. +// A group's client should implement this interface. +type DeleteBackupRequestsGetter interface { + DeleteBackupRequests(namespace string) DeleteBackupRequestInterface +} + +// DeleteBackupRequestInterface has methods to work with DeleteBackupRequest resources. +type DeleteBackupRequestInterface interface { + Create(*v1.DeleteBackupRequest) (*v1.DeleteBackupRequest, error) + Update(*v1.DeleteBackupRequest) (*v1.DeleteBackupRequest, error) + UpdateStatus(*v1.DeleteBackupRequest) (*v1.DeleteBackupRequest, error) + Delete(name string, options *meta_v1.DeleteOptions) error + DeleteCollection(options *meta_v1.DeleteOptions, listOptions meta_v1.ListOptions) error + Get(name string, options meta_v1.GetOptions) (*v1.DeleteBackupRequest, error) + List(opts meta_v1.ListOptions) (*v1.DeleteBackupRequestList, error) + Watch(opts meta_v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.DeleteBackupRequest, err error) + DeleteBackupRequestExpansion +} + +// deleteBackupRequests implements DeleteBackupRequestInterface +type deleteBackupRequests struct { + client rest.Interface + ns string +} + +// newDeleteBackupRequests returns a DeleteBackupRequests +func newDeleteBackupRequests(c *ArkV1Client, namespace string) *deleteBackupRequests { + return &deleteBackupRequests{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the deleteBackupRequest, and returns the corresponding deleteBackupRequest object, and an error if there is any. +func (c *deleteBackupRequests) Get(name string, options meta_v1.GetOptions) (result *v1.DeleteBackupRequest, err error) { + result = &v1.DeleteBackupRequest{} + err = c.client.Get(). + Namespace(c.ns). + Resource("deletebackuprequests"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of DeleteBackupRequests that match those selectors. +func (c *deleteBackupRequests) List(opts meta_v1.ListOptions) (result *v1.DeleteBackupRequestList, err error) { + result = &v1.DeleteBackupRequestList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("deletebackuprequests"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested deleteBackupRequests. +func (c *deleteBackupRequests) Watch(opts meta_v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("deletebackuprequests"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a deleteBackupRequest and creates it. Returns the server's representation of the deleteBackupRequest, and an error, if there is any. +func (c *deleteBackupRequests) Create(deleteBackupRequest *v1.DeleteBackupRequest) (result *v1.DeleteBackupRequest, err error) { + result = &v1.DeleteBackupRequest{} + err = c.client.Post(). + Namespace(c.ns). + Resource("deletebackuprequests"). + Body(deleteBackupRequest). + Do(). + Into(result) + return +} + +// Update takes the representation of a deleteBackupRequest and updates it. Returns the server's representation of the deleteBackupRequest, and an error, if there is any. +func (c *deleteBackupRequests) Update(deleteBackupRequest *v1.DeleteBackupRequest) (result *v1.DeleteBackupRequest, err error) { + result = &v1.DeleteBackupRequest{} + err = c.client.Put(). + Namespace(c.ns). + Resource("deletebackuprequests"). + Name(deleteBackupRequest.Name). + Body(deleteBackupRequest). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *deleteBackupRequests) UpdateStatus(deleteBackupRequest *v1.DeleteBackupRequest) (result *v1.DeleteBackupRequest, err error) { + result = &v1.DeleteBackupRequest{} + err = c.client.Put(). + Namespace(c.ns). + Resource("deletebackuprequests"). + Name(deleteBackupRequest.Name). + SubResource("status"). + Body(deleteBackupRequest). + Do(). + Into(result) + return +} + +// Delete takes name of the deleteBackupRequest and deletes it. Returns an error if one occurs. +func (c *deleteBackupRequests) Delete(name string, options *meta_v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("deletebackuprequests"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *deleteBackupRequests) DeleteCollection(options *meta_v1.DeleteOptions, listOptions meta_v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("deletebackuprequests"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched deleteBackupRequest. +func (c *deleteBackupRequests) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.DeleteBackupRequest, err error) { + result = &v1.DeleteBackupRequest{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("deletebackuprequests"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go index cd5ea1a97..9e7659017 100644 --- a/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go +++ b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_ark_client.go @@ -33,6 +33,10 @@ func (c *FakeArkV1) Configs(namespace string) v1.ConfigInterface { return &FakeConfigs{c, namespace} } +func (c *FakeArkV1) DeleteBackupRequests(namespace string) v1.DeleteBackupRequestInterface { + return &FakeDeleteBackupRequests{c, namespace} +} + func (c *FakeArkV1) DownloadRequests(namespace string) v1.DownloadRequestInterface { return &FakeDownloadRequests{c, namespace} } diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_deletebackuprequest.go b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_deletebackuprequest.go new file mode 100644 index 000000000..3852c16b7 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/ark/v1/fake/fake_deletebackuprequest.go @@ -0,0 +1,137 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package fake + +import ( + ark_v1 "github.com/heptio/ark/pkg/apis/ark/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeDeleteBackupRequests implements DeleteBackupRequestInterface +type FakeDeleteBackupRequests struct { + Fake *FakeArkV1 + ns string +} + +var deletebackuprequestsResource = schema.GroupVersionResource{Group: "ark.heptio.com", Version: "v1", Resource: "deletebackuprequests"} + +var deletebackuprequestsKind = schema.GroupVersionKind{Group: "ark.heptio.com", Version: "v1", Kind: "DeleteBackupRequest"} + +// Get takes name of the deleteBackupRequest, and returns the corresponding deleteBackupRequest object, and an error if there is any. +func (c *FakeDeleteBackupRequests) Get(name string, options v1.GetOptions) (result *ark_v1.DeleteBackupRequest, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(deletebackuprequestsResource, c.ns, name), &ark_v1.DeleteBackupRequest{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.DeleteBackupRequest), err +} + +// List takes label and field selectors, and returns the list of DeleteBackupRequests that match those selectors. +func (c *FakeDeleteBackupRequests) List(opts v1.ListOptions) (result *ark_v1.DeleteBackupRequestList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(deletebackuprequestsResource, deletebackuprequestsKind, c.ns, opts), &ark_v1.DeleteBackupRequestList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &ark_v1.DeleteBackupRequestList{} + for _, item := range obj.(*ark_v1.DeleteBackupRequestList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested deleteBackupRequests. +func (c *FakeDeleteBackupRequests) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(deletebackuprequestsResource, c.ns, opts)) + +} + +// Create takes the representation of a deleteBackupRequest and creates it. Returns the server's representation of the deleteBackupRequest, and an error, if there is any. +func (c *FakeDeleteBackupRequests) Create(deleteBackupRequest *ark_v1.DeleteBackupRequest) (result *ark_v1.DeleteBackupRequest, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(deletebackuprequestsResource, c.ns, deleteBackupRequest), &ark_v1.DeleteBackupRequest{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.DeleteBackupRequest), err +} + +// Update takes the representation of a deleteBackupRequest and updates it. Returns the server's representation of the deleteBackupRequest, and an error, if there is any. +func (c *FakeDeleteBackupRequests) Update(deleteBackupRequest *ark_v1.DeleteBackupRequest) (result *ark_v1.DeleteBackupRequest, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(deletebackuprequestsResource, c.ns, deleteBackupRequest), &ark_v1.DeleteBackupRequest{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.DeleteBackupRequest), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeDeleteBackupRequests) UpdateStatus(deleteBackupRequest *ark_v1.DeleteBackupRequest) (*ark_v1.DeleteBackupRequest, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(deletebackuprequestsResource, "status", c.ns, deleteBackupRequest), &ark_v1.DeleteBackupRequest{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.DeleteBackupRequest), err +} + +// Delete takes name of the deleteBackupRequest and deletes it. Returns an error if one occurs. +func (c *FakeDeleteBackupRequests) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(deletebackuprequestsResource, c.ns, name), &ark_v1.DeleteBackupRequest{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeDeleteBackupRequests) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(deletebackuprequestsResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &ark_v1.DeleteBackupRequestList{}) + return err +} + +// Patch applies the patch and returns the patched deleteBackupRequest. +func (c *FakeDeleteBackupRequests) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *ark_v1.DeleteBackupRequest, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(deletebackuprequestsResource, c.ns, name, data, subresources...), &ark_v1.DeleteBackupRequest{}) + + if obj == nil { + return nil, err + } + return obj.(*ark_v1.DeleteBackupRequest), err +} diff --git a/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go index ee208c31a..37d08c935 100644 --- a/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go +++ b/pkg/generated/clientset/versioned/typed/ark/v1/generated_expansion.go @@ -19,6 +19,8 @@ type BackupExpansion interface{} type ConfigExpansion interface{} +type DeleteBackupRequestExpansion interface{} + type DownloadRequestExpansion interface{} type RestoreExpansion interface{} diff --git a/pkg/generated/informers/externalversions/ark/v1/deletebackuprequest.go b/pkg/generated/informers/externalversions/ark/v1/deletebackuprequest.go new file mode 100644 index 000000000..0f62dfd78 --- /dev/null +++ b/pkg/generated/informers/externalversions/ark/v1/deletebackuprequest.go @@ -0,0 +1,88 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was automatically generated by informer-gen + +package v1 + +import ( + ark_v1 "github.com/heptio/ark/pkg/apis/ark/v1" + versioned "github.com/heptio/ark/pkg/generated/clientset/versioned" + internalinterfaces "github.com/heptio/ark/pkg/generated/informers/externalversions/internalinterfaces" + v1 "github.com/heptio/ark/pkg/generated/listers/ark/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" + time "time" +) + +// DeleteBackupRequestInformer provides access to a shared informer and lister for +// DeleteBackupRequests. +type DeleteBackupRequestInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1.DeleteBackupRequestLister +} + +type deleteBackupRequestInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewDeleteBackupRequestInformer constructs a new informer for DeleteBackupRequest type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewDeleteBackupRequestInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredDeleteBackupRequestInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredDeleteBackupRequestInformer constructs a new informer for DeleteBackupRequest type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredDeleteBackupRequestInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ArkV1().DeleteBackupRequests(namespace).List(options) + }, + WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ArkV1().DeleteBackupRequests(namespace).Watch(options) + }, + }, + &ark_v1.DeleteBackupRequest{}, + resyncPeriod, + indexers, + ) +} + +func (f *deleteBackupRequestInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredDeleteBackupRequestInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *deleteBackupRequestInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&ark_v1.DeleteBackupRequest{}, f.defaultInformer) +} + +func (f *deleteBackupRequestInformer) Lister() v1.DeleteBackupRequestLister { + return v1.NewDeleteBackupRequestLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/informers/externalversions/ark/v1/interface.go b/pkg/generated/informers/externalversions/ark/v1/interface.go index 1a27a0fd9..ae954275b 100644 --- a/pkg/generated/informers/externalversions/ark/v1/interface.go +++ b/pkg/generated/informers/externalversions/ark/v1/interface.go @@ -28,6 +28,8 @@ type Interface interface { Backups() BackupInformer // Configs returns a ConfigInformer. Configs() ConfigInformer + // DeleteBackupRequests returns a DeleteBackupRequestInformer. + DeleteBackupRequests() DeleteBackupRequestInformer // DownloadRequests returns a DownloadRequestInformer. DownloadRequests() DownloadRequestInformer // Restores returns a RestoreInformer. @@ -57,6 +59,11 @@ func (v *version) Configs() ConfigInformer { return &configInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// DeleteBackupRequests returns a DeleteBackupRequestInformer. +func (v *version) DeleteBackupRequests() DeleteBackupRequestInformer { + return &deleteBackupRequestInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // DownloadRequests returns a DownloadRequestInformer. func (v *version) DownloadRequests() DownloadRequestInformer { return &downloadRequestInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/informers/externalversions/generic.go b/pkg/generated/informers/externalversions/generic.go index 4b0a584bf..f4468b76b 100644 --- a/pkg/generated/informers/externalversions/generic.go +++ b/pkg/generated/informers/externalversions/generic.go @@ -56,6 +56,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().Backups().Informer()}, nil case v1.SchemeGroupVersion.WithResource("configs"): return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().Configs().Informer()}, nil + case v1.SchemeGroupVersion.WithResource("deletebackuprequests"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().DeleteBackupRequests().Informer()}, nil case v1.SchemeGroupVersion.WithResource("downloadrequests"): return &genericInformer{resource: resource.GroupResource(), informer: f.Ark().V1().DownloadRequests().Informer()}, nil case v1.SchemeGroupVersion.WithResource("restores"): diff --git a/pkg/generated/listers/ark/v1/deletebackuprequest.go b/pkg/generated/listers/ark/v1/deletebackuprequest.go new file mode 100644 index 000000000..63d2b81bb --- /dev/null +++ b/pkg/generated/listers/ark/v1/deletebackuprequest.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was automatically generated by lister-gen + +package v1 + +import ( + v1 "github.com/heptio/ark/pkg/apis/ark/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// DeleteBackupRequestLister helps list DeleteBackupRequests. +type DeleteBackupRequestLister interface { + // List lists all DeleteBackupRequests in the indexer. + List(selector labels.Selector) (ret []*v1.DeleteBackupRequest, err error) + // DeleteBackupRequests returns an object that can list and get DeleteBackupRequests. + DeleteBackupRequests(namespace string) DeleteBackupRequestNamespaceLister + DeleteBackupRequestListerExpansion +} + +// deleteBackupRequestLister implements the DeleteBackupRequestLister interface. +type deleteBackupRequestLister struct { + indexer cache.Indexer +} + +// NewDeleteBackupRequestLister returns a new DeleteBackupRequestLister. +func NewDeleteBackupRequestLister(indexer cache.Indexer) DeleteBackupRequestLister { + return &deleteBackupRequestLister{indexer: indexer} +} + +// List lists all DeleteBackupRequests in the indexer. +func (s *deleteBackupRequestLister) List(selector labels.Selector) (ret []*v1.DeleteBackupRequest, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1.DeleteBackupRequest)) + }) + return ret, err +} + +// DeleteBackupRequests returns an object that can list and get DeleteBackupRequests. +func (s *deleteBackupRequestLister) DeleteBackupRequests(namespace string) DeleteBackupRequestNamespaceLister { + return deleteBackupRequestNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// DeleteBackupRequestNamespaceLister helps list and get DeleteBackupRequests. +type DeleteBackupRequestNamespaceLister interface { + // List lists all DeleteBackupRequests in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1.DeleteBackupRequest, err error) + // Get retrieves the DeleteBackupRequest from the indexer for a given namespace and name. + Get(name string) (*v1.DeleteBackupRequest, error) + DeleteBackupRequestNamespaceListerExpansion +} + +// deleteBackupRequestNamespaceLister implements the DeleteBackupRequestNamespaceLister +// interface. +type deleteBackupRequestNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all DeleteBackupRequests in the indexer for a given namespace. +func (s deleteBackupRequestNamespaceLister) List(selector labels.Selector) (ret []*v1.DeleteBackupRequest, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1.DeleteBackupRequest)) + }) + return ret, err +} + +// Get retrieves the DeleteBackupRequest from the indexer for a given namespace and name. +func (s deleteBackupRequestNamespaceLister) Get(name string) (*v1.DeleteBackupRequest, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("deletebackuprequest"), name) + } + return obj.(*v1.DeleteBackupRequest), nil +} diff --git a/pkg/generated/listers/ark/v1/expansion_generated.go b/pkg/generated/listers/ark/v1/expansion_generated.go index 5f7798a96..1bdb0f6f3 100644 --- a/pkg/generated/listers/ark/v1/expansion_generated.go +++ b/pkg/generated/listers/ark/v1/expansion_generated.go @@ -34,6 +34,14 @@ type ConfigListerExpansion interface{} // ConfigNamespaceLister. type ConfigNamespaceListerExpansion interface{} +// DeleteBackupRequestListerExpansion allows custom methods to be added to +// DeleteBackupRequestLister. +type DeleteBackupRequestListerExpansion interface{} + +// DeleteBackupRequestNamespaceListerExpansion allows custom methods to be added to +// DeleteBackupRequestNamespaceLister. +type DeleteBackupRequestNamespaceListerExpansion interface{} + // DownloadRequestListerExpansion allows custom methods to be added to // DownloadRequestLister. type DownloadRequestListerExpansion interface{} diff --git a/pkg/util/test/test_backup.go b/pkg/util/test/test_backup.go index e2440a4a4..bd870fd29 100644 --- a/pkg/util/test/test_backup.go +++ b/pkg/util/test/test_backup.go @@ -120,11 +120,6 @@ func (b *TestBackup) WithDeletionTimestamp(time time.Time) *TestBackup { return b } -func (b *TestBackup) WithFinalizers(finalizers ...string) *TestBackup { - b.Finalizers = finalizers - return b -} - func (b *TestBackup) WithResourceVersion(version string) *TestBackup { b.ResourceVersion = version return b