From c6f488f75fb7435c2bd1ba49a43a70adf7379aa4 Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Thu, 16 Aug 2018 18:41:59 -0400 Subject: [PATCH] Use backup location in the backup controller Fixes #739 Signed-off-by: Nolan Brubaker --- pkg/cmd/server/server.go | 4 +- pkg/controller/backup_controller.go | 109 +++++++++++++++-------- pkg/controller/backup_controller_test.go | 85 ++++++++++++++++-- pkg/util/test/test_backup.go | 5 ++ 4 files changed, 153 insertions(+), 50 deletions(-) diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 8f95f84c0..d95aed7da 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -635,13 +635,13 @@ func (s *server) runControllers(config *api.Config) error { s.sharedInformerFactory.Ark().V1().Backups(), s.arkClient.ArkV1(), backupper, - config.BackupStorageProvider.CloudProviderConfig, - config.BackupStorageProvider.Bucket, s.blockStore != nil, s.logger, s.logLevel, s.pluginRegistry, backupTracker, + s.sharedInformerFactory.Ark().V1().BackupStorageLocations(), + s.defaultBackupLocation, s.metrics, ) wg.Add(1) diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 36613191b..88f62fbe7 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -57,21 +57,22 @@ import ( const backupVersion = 1 type backupController struct { - backupper backup.Backupper - objectStoreConfig api.CloudProviderConfig - bucket string - pvProviderExists bool - lister listers.BackupLister - listerSynced cache.InformerSynced - client arkv1client.BackupsGetter - syncHandler func(backupName string) error - queue workqueue.RateLimitingInterface - clock clock.Clock - logger logrus.FieldLogger - logLevel logrus.Level - pluginRegistry plugin.Registry - backupTracker BackupTracker - metrics *metrics.ServerMetrics + backupper backup.Backupper + pvProviderExists bool + lister listers.BackupLister + listerSynced cache.InformerSynced + client arkv1client.BackupsGetter + syncHandler func(backupName string) error + queue workqueue.RateLimitingInterface + clock clock.Clock + logger logrus.FieldLogger + logLevel logrus.Level + pluginRegistry plugin.Registry + backupTracker BackupTracker + backupLocationLister listers.BackupStorageLocationLister + backupLocationListerSynced cache.InformerSynced + defaultBackupLocation string + metrics *metrics.ServerMetrics newPluginManager func(logger logrus.FieldLogger, logLevel logrus.Level, pluginRegistry plugin.Registry) plugin.Manager } @@ -80,30 +81,31 @@ func NewBackupController( backupInformer informers.BackupInformer, client arkv1client.BackupsGetter, backupper backup.Backupper, - objectStoreConfig api.CloudProviderConfig, - bucket string, pvProviderExists bool, logger logrus.FieldLogger, logLevel logrus.Level, pluginRegistry plugin.Registry, backupTracker BackupTracker, + backupLocationInformer informers.BackupStorageLocationInformer, + defaultBackupLocation string, metrics *metrics.ServerMetrics, ) Interface { c := &backupController{ - backupper: backupper, - objectStoreConfig: objectStoreConfig, - bucket: bucket, - pvProviderExists: pvProviderExists, - lister: backupInformer.Lister(), - listerSynced: backupInformer.Informer().HasSynced, - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), - clock: &clock.RealClock{}, - logger: logger, - logLevel: logLevel, - pluginRegistry: pluginRegistry, - backupTracker: backupTracker, - metrics: metrics, + backupper: backupper, + pvProviderExists: pvProviderExists, + lister: backupInformer.Lister(), + listerSynced: backupInformer.Informer().HasSynced, + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), + clock: &clock.RealClock{}, + logger: logger, + logLevel: logLevel, + pluginRegistry: pluginRegistry, + backupTracker: backupTracker, + backupLocationLister: backupLocationInformer.Lister(), + backupLocationListerSynced: backupLocationInformer.Informer().HasSynced, + defaultBackupLocation: defaultBackupLocation, + metrics: metrics, newPluginManager: func(logger logrus.FieldLogger, logLevel logrus.Level, pluginRegistry plugin.Registry) plugin.Manager { return plugin.NewManager(logger, logLevel, pluginRegistry) @@ -165,7 +167,7 @@ func (controller *backupController) Run(ctx context.Context, numWorkers int) err defer controller.logger.Info("Shutting down BackupController") controller.logger.Info("Waiting for caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced) { + if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced, controller.backupLocationListerSynced) { return errors.New("timed out waiting for caches to sync") } controller.logger.Info("Caches are synced") @@ -259,8 +261,9 @@ func (controller *backupController) processBackup(key string) error { backup.Status.Expiration = metav1.NewTime(controller.clock.Now().Add(backup.Spec.TTL.Duration)) } + var backupLocation *api.BackupStorageLocation // validation - if backup.Status.ValidationErrors = controller.getValidationErrors(backup); len(backup.Status.ValidationErrors) > 0 { + if backupLocation, backup.Status.ValidationErrors = controller.getLocationAndValidate(backup, controller.defaultBackupLocation); len(backup.Status.ValidationErrors) > 0 { backup.Status.Phase = api.BackupPhaseFailedValidation } else { backup.Status.Phase = api.BackupPhaseInProgress @@ -287,7 +290,7 @@ func (controller *backupController) processBackup(key string) error { backupScheduleName := backup.GetLabels()["ark-schedule"] controller.metrics.RegisterBackupAttempt(backupScheduleName) - if err := controller.runBackup(backup, controller.bucket); err != nil { + if err := controller.runBackup(backup, backupLocation); err != nil { logContext.WithError(err).Error("backup failed") backup.Status.Phase = api.BackupPhaseFailed controller.metrics.RegisterBackupFailed(backupScheduleName) @@ -327,7 +330,7 @@ func patchBackup(original, updated *api.Backup, client arkv1client.BackupsGetter return res, nil } -func (controller *backupController) getValidationErrors(itm *api.Backup) []string { +func (controller *backupController) getLocationAndValidate(itm *api.Backup, defaultBackupLocation string) (*api.BackupStorageLocation, []string) { var validationErrors []string for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) { @@ -342,10 +345,20 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin validationErrors = append(validationErrors, "Server is not configured for PV snapshots") } - return validationErrors + if itm.Spec.StorageLocation == "" { + itm.Spec.StorageLocation = defaultBackupLocation + } + + var backupLocation *api.BackupStorageLocation + backupLocation, err := controller.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation) + if err != nil { + validationErrors = append(validationErrors, fmt.Sprintf("Error getting backup storage location: %v", err)) + } + + return backupLocation, validationErrors } -func (controller *backupController) runBackup(backup *api.Backup, bucket string) error { +func (controller *backupController) runBackup(backup *api.Backup, backupLocation *api.BackupStorageLocation) error { log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)) log.Info("Starting backup") backup.Status.StartTimestamp.Time = controller.clock.Now() @@ -382,7 +395,7 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) return err } - objectStore, err := getObjectStore(controller.objectStoreConfig, pluginManager) + objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager) if err != nil { return err } @@ -424,7 +437,7 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) controller.logger.WithError(err).Error("error closing gzippedLogFile") } - if err := cloudprovider.UploadBackup(log, objectStore, bucket, backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil { + if err := cloudprovider.UploadBackup(log, objectStore, backupLocation.Spec.ObjectStorage.Bucket, backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil { errs = append(errs, err) } @@ -458,6 +471,24 @@ func getObjectStore(cloudConfig api.CloudProviderConfig, manager plugin.Manager) return objectStore, nil } +// TODO(nrb): Consolidate with other implementations +func getObjectStoreForLocation(location *api.BackupStorageLocation, manager plugin.Manager) (cloudprovider.ObjectStore, error) { + if location.Spec.Provider == "" { + return nil, errors.New("backup storage location provider name must not be empty") + } + + objectStore, err := manager.GetObjectStore(location.Spec.Provider) + if err != nil { + return nil, err + } + + if err := objectStore.Init(location.Spec.Config); err != nil { + return nil, err + } + + return objectStore, nil +} + func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) { if err := file.Close(); err != nil { log.WithError(err).WithField("file", file.Name()).Error("error closing file") diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 27a000f1f..49d9c6876 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -152,6 +152,24 @@ func TestProcessBackup(t *testing.T) { allowSnapshots: true, expectBackup: true, }, + { + name: "Backup without a location will have it set to the default", + key: "heptio-ark/backup1", + backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew), + expectBackup: true, + }, + { + name: "Backup with a location completes", + key: "heptio-ark/backup1", + backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithStorageLocation("loc1"), + expectBackup: true, + }, + { + name: "Backup with non-existent location will fail validation", + key: "heptio-ark/backup1", + backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithStorageLocation("loc2"), + expectBackup: false, + }, } for _, test := range tests { @@ -174,13 +192,13 @@ func TestProcessBackup(t *testing.T) { sharedInformers.Ark().V1().Backups(), client.ArkV1(), backupper, - v1.CloudProviderConfig{Name: "myCloud"}, - "bucket", test.allowSnapshots, logger, logrus.InfoLevel, pluginRegistry, NewBackupTracker(), + sharedInformers.Ark().V1().BackupStorageLocations(), + "default", metrics.NewServerMetrics(), ).(*backupController) @@ -224,6 +242,37 @@ func TestProcessBackup(t *testing.T) { mock.Anything, // actions ).Return(nil) + defaultLocation := &v1.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: "default", + }, + Spec: v1.BackupStorageLocationSpec{ + Provider: "myCloud", + StorageType: v1.StorageType{ + ObjectStorage: &v1.ObjectStorageLocation{ + Bucket: "bucket", + }, + }, + }, + } + loc1 := &v1.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: "loc1", + }, + Spec: v1.BackupStorageLocationSpec{ + Provider: "myCloud", + StorageType: v1.StorageType{ + ObjectStorage: &v1.ObjectStorageLocation{ + Bucket: "bucket", + }, + }, + }, + } + require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(defaultLocation)) + require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(loc1)) + pluginManager.On("GetBackupItemActions").Return(nil, nil) // Ensure we have a CompletionTimestamp when uploading. @@ -312,9 +361,13 @@ func TestProcessBackup(t *testing.T) { StartTimestamp metav1.Time `json:"startTimestamp"` CompletionTimestamp metav1.Time `json:"completionTimestamp"` } + type SpecPatch struct { + StorageLocation string `json:"storageLocation"` + } type Patch struct { Status StatusPatch `json:"status"` + Spec SpecPatch `json:"spec,omitempty"` } decode := func(decoder *json.Decoder) (interface{}, error) { @@ -324,13 +377,27 @@ func TestProcessBackup(t *testing.T) { return *actual, err } - // validate Patch call 1 (setting version, expiration, and phase) - expected := Patch{ - Status: StatusPatch{ - Version: 1, - Phase: v1.BackupPhaseInProgress, - Expiration: expiration, - }, + // validate Patch call 1 (setting version, expiration, phase, and storage location) + var expected Patch + if test.backup.Spec.StorageLocation == "" { + expected = Patch{ + Status: StatusPatch{ + Version: 1, + Phase: v1.BackupPhaseInProgress, + Expiration: expiration, + }, + Spec: SpecPatch{ + StorageLocation: "default", + }, + } + } else { + expected = Patch{ + Status: StatusPatch{ + Version: 1, + Phase: v1.BackupPhaseInProgress, + Expiration: expiration, + }, + } } arktest.ValidatePatch(t, actions[0], expected, decode) diff --git a/pkg/util/test/test_backup.go b/pkg/util/test/test_backup.go index d7f877e51..041dd9e84 100644 --- a/pkg/util/test/test_backup.go +++ b/pkg/util/test/test_backup.go @@ -135,3 +135,8 @@ func (b *TestBackup) WithStartTimestamp(startTime time.Time) *TestBackup { b.Status.StartTimestamp = metav1.Time{Time: startTime} return b } + +func (b *TestBackup) WithStorageLocation(location string) *TestBackup { + b.Spec.StorageLocation = location + return b +}