Use backup location in the backup controller

Fixes #739

Signed-off-by: Nolan Brubaker <nolan@heptio.com>
This commit is contained in:
Nolan Brubaker
2018-08-16 18:41:59 -04:00
committed by Steve Kriss
parent 06b5af449f
commit c6f488f75f
4 changed files with 153 additions and 50 deletions

View File

@@ -635,13 +635,13 @@ func (s *server) runControllers(config *api.Config) error {
s.sharedInformerFactory.Ark().V1().Backups(),
s.arkClient.ArkV1(),
backupper,
config.BackupStorageProvider.CloudProviderConfig,
config.BackupStorageProvider.Bucket,
s.blockStore != nil,
s.logger,
s.logLevel,
s.pluginRegistry,
backupTracker,
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
s.defaultBackupLocation,
s.metrics,
)
wg.Add(1)

View File

@@ -57,21 +57,22 @@ import (
const backupVersion = 1
type backupController struct {
backupper backup.Backupper
objectStoreConfig api.CloudProviderConfig
bucket string
pvProviderExists bool
lister listers.BackupLister
listerSynced cache.InformerSynced
client arkv1client.BackupsGetter
syncHandler func(backupName string) error
queue workqueue.RateLimitingInterface
clock clock.Clock
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry plugin.Registry
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupper backup.Backupper
pvProviderExists bool
lister listers.BackupLister
listerSynced cache.InformerSynced
client arkv1client.BackupsGetter
syncHandler func(backupName string) error
queue workqueue.RateLimitingInterface
clock clock.Clock
logger logrus.FieldLogger
logLevel logrus.Level
pluginRegistry plugin.Registry
backupTracker BackupTracker
backupLocationLister listers.BackupStorageLocationLister
backupLocationListerSynced cache.InformerSynced
defaultBackupLocation string
metrics *metrics.ServerMetrics
newPluginManager func(logger logrus.FieldLogger, logLevel logrus.Level, pluginRegistry plugin.Registry) plugin.Manager
}
@@ -80,30 +81,31 @@ func NewBackupController(
backupInformer informers.BackupInformer,
client arkv1client.BackupsGetter,
backupper backup.Backupper,
objectStoreConfig api.CloudProviderConfig,
bucket string,
pvProviderExists bool,
logger logrus.FieldLogger,
logLevel logrus.Level,
pluginRegistry plugin.Registry,
backupTracker BackupTracker,
backupLocationInformer informers.BackupStorageLocationInformer,
defaultBackupLocation string,
metrics *metrics.ServerMetrics,
) Interface {
c := &backupController{
backupper: backupper,
objectStoreConfig: objectStoreConfig,
bucket: bucket,
pvProviderExists: pvProviderExists,
lister: backupInformer.Lister(),
listerSynced: backupInformer.Informer().HasSynced,
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"),
clock: &clock.RealClock{},
logger: logger,
logLevel: logLevel,
pluginRegistry: pluginRegistry,
backupTracker: backupTracker,
metrics: metrics,
backupper: backupper,
pvProviderExists: pvProviderExists,
lister: backupInformer.Lister(),
listerSynced: backupInformer.Informer().HasSynced,
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"),
clock: &clock.RealClock{},
logger: logger,
logLevel: logLevel,
pluginRegistry: pluginRegistry,
backupTracker: backupTracker,
backupLocationLister: backupLocationInformer.Lister(),
backupLocationListerSynced: backupLocationInformer.Informer().HasSynced,
defaultBackupLocation: defaultBackupLocation,
metrics: metrics,
newPluginManager: func(logger logrus.FieldLogger, logLevel logrus.Level, pluginRegistry plugin.Registry) plugin.Manager {
return plugin.NewManager(logger, logLevel, pluginRegistry)
@@ -165,7 +167,7 @@ func (controller *backupController) Run(ctx context.Context, numWorkers int) err
defer controller.logger.Info("Shutting down BackupController")
controller.logger.Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced) {
if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced, controller.backupLocationListerSynced) {
return errors.New("timed out waiting for caches to sync")
}
controller.logger.Info("Caches are synced")
@@ -259,8 +261,9 @@ func (controller *backupController) processBackup(key string) error {
backup.Status.Expiration = metav1.NewTime(controller.clock.Now().Add(backup.Spec.TTL.Duration))
}
var backupLocation *api.BackupStorageLocation
// validation
if backup.Status.ValidationErrors = controller.getValidationErrors(backup); len(backup.Status.ValidationErrors) > 0 {
if backupLocation, backup.Status.ValidationErrors = controller.getLocationAndValidate(backup, controller.defaultBackupLocation); len(backup.Status.ValidationErrors) > 0 {
backup.Status.Phase = api.BackupPhaseFailedValidation
} else {
backup.Status.Phase = api.BackupPhaseInProgress
@@ -287,7 +290,7 @@ func (controller *backupController) processBackup(key string) error {
backupScheduleName := backup.GetLabels()["ark-schedule"]
controller.metrics.RegisterBackupAttempt(backupScheduleName)
if err := controller.runBackup(backup, controller.bucket); err != nil {
if err := controller.runBackup(backup, backupLocation); err != nil {
logContext.WithError(err).Error("backup failed")
backup.Status.Phase = api.BackupPhaseFailed
controller.metrics.RegisterBackupFailed(backupScheduleName)
@@ -327,7 +330,7 @@ func patchBackup(original, updated *api.Backup, client arkv1client.BackupsGetter
return res, nil
}
func (controller *backupController) getValidationErrors(itm *api.Backup) []string {
func (controller *backupController) getLocationAndValidate(itm *api.Backup, defaultBackupLocation string) (*api.BackupStorageLocation, []string) {
var validationErrors []string
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) {
@@ -342,10 +345,20 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin
validationErrors = append(validationErrors, "Server is not configured for PV snapshots")
}
return validationErrors
if itm.Spec.StorageLocation == "" {
itm.Spec.StorageLocation = defaultBackupLocation
}
var backupLocation *api.BackupStorageLocation
backupLocation, err := controller.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation)
if err != nil {
validationErrors = append(validationErrors, fmt.Sprintf("Error getting backup storage location: %v", err))
}
return backupLocation, validationErrors
}
func (controller *backupController) runBackup(backup *api.Backup, bucket string) error {
func (controller *backupController) runBackup(backup *api.Backup, backupLocation *api.BackupStorageLocation) error {
log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")
backup.Status.StartTimestamp.Time = controller.clock.Now()
@@ -382,7 +395,7 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
return err
}
objectStore, err := getObjectStore(controller.objectStoreConfig, pluginManager)
objectStore, err := getObjectStoreForLocation(backupLocation, pluginManager)
if err != nil {
return err
}
@@ -424,7 +437,7 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string)
controller.logger.WithError(err).Error("error closing gzippedLogFile")
}
if err := cloudprovider.UploadBackup(log, objectStore, bucket, backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil {
if err := cloudprovider.UploadBackup(log, objectStore, backupLocation.Spec.ObjectStorage.Bucket, backup.Name, backupJSONToUpload, backupFileToUpload, logFile); err != nil {
errs = append(errs, err)
}
@@ -458,6 +471,24 @@ func getObjectStore(cloudConfig api.CloudProviderConfig, manager plugin.Manager)
return objectStore, nil
}
// TODO(nrb): Consolidate with other implementations
func getObjectStoreForLocation(location *api.BackupStorageLocation, manager plugin.Manager) (cloudprovider.ObjectStore, error) {
if location.Spec.Provider == "" {
return nil, errors.New("backup storage location provider name must not be empty")
}
objectStore, err := manager.GetObjectStore(location.Spec.Provider)
if err != nil {
return nil, err
}
if err := objectStore.Init(location.Spec.Config); err != nil {
return nil, err
}
return objectStore, nil
}
func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error closing file")

View File

@@ -152,6 +152,24 @@ func TestProcessBackup(t *testing.T) {
allowSnapshots: true,
expectBackup: true,
},
{
name: "Backup without a location will have it set to the default",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew),
expectBackup: true,
},
{
name: "Backup with a location completes",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithStorageLocation("loc1"),
expectBackup: true,
},
{
name: "Backup with non-existent location will fail validation",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithStorageLocation("loc2"),
expectBackup: false,
},
}
for _, test := range tests {
@@ -174,13 +192,13 @@ func TestProcessBackup(t *testing.T) {
sharedInformers.Ark().V1().Backups(),
client.ArkV1(),
backupper,
v1.CloudProviderConfig{Name: "myCloud"},
"bucket",
test.allowSnapshots,
logger,
logrus.InfoLevel,
pluginRegistry,
NewBackupTracker(),
sharedInformers.Ark().V1().BackupStorageLocations(),
"default",
metrics.NewServerMetrics(),
).(*backupController)
@@ -224,6 +242,37 @@ func TestProcessBackup(t *testing.T) {
mock.Anything, // actions
).Return(nil)
defaultLocation := &v1.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: "default",
},
Spec: v1.BackupStorageLocationSpec{
Provider: "myCloud",
StorageType: v1.StorageType{
ObjectStorage: &v1.ObjectStorageLocation{
Bucket: "bucket",
},
},
},
}
loc1 := &v1.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: "loc1",
},
Spec: v1.BackupStorageLocationSpec{
Provider: "myCloud",
StorageType: v1.StorageType{
ObjectStorage: &v1.ObjectStorageLocation{
Bucket: "bucket",
},
},
},
}
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(defaultLocation))
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(loc1))
pluginManager.On("GetBackupItemActions").Return(nil, nil)
// Ensure we have a CompletionTimestamp when uploading.
@@ -312,9 +361,13 @@ func TestProcessBackup(t *testing.T) {
StartTimestamp metav1.Time `json:"startTimestamp"`
CompletionTimestamp metav1.Time `json:"completionTimestamp"`
}
type SpecPatch struct {
StorageLocation string `json:"storageLocation"`
}
type Patch struct {
Status StatusPatch `json:"status"`
Spec SpecPatch `json:"spec,omitempty"`
}
decode := func(decoder *json.Decoder) (interface{}, error) {
@@ -324,13 +377,27 @@ func TestProcessBackup(t *testing.T) {
return *actual, err
}
// validate Patch call 1 (setting version, expiration, and phase)
expected := Patch{
Status: StatusPatch{
Version: 1,
Phase: v1.BackupPhaseInProgress,
Expiration: expiration,
},
// validate Patch call 1 (setting version, expiration, phase, and storage location)
var expected Patch
if test.backup.Spec.StorageLocation == "" {
expected = Patch{
Status: StatusPatch{
Version: 1,
Phase: v1.BackupPhaseInProgress,
Expiration: expiration,
},
Spec: SpecPatch{
StorageLocation: "default",
},
}
} else {
expected = Patch{
Status: StatusPatch{
Version: 1,
Phase: v1.BackupPhaseInProgress,
Expiration: expiration,
},
}
}
arktest.ValidatePatch(t, actions[0], expected, decode)

View File

@@ -135,3 +135,8 @@ func (b *TestBackup) WithStartTimestamp(startTime time.Time) *TestBackup {
b.Status.StartTimestamp = metav1.Time{Time: startTime}
return b
}
func (b *TestBackup) WithStorageLocation(location string) *TestBackup {
b.Spec.StorageLocation = location
return b
}