enable a schedule to be provided as the source for a restore

- ScheduleName is added as an API field to the Restore object
- Restore controller validates that exactly one of BackupName
  or ScheduleName has been provided
- If ScheduleName is provided, Restore controller populates
  BackupName with the name of the most recent successful backup
  created from the schedule
- --from-schedule flag is added to `ark restore create` CLI cmd

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2018-04-20 11:02:59 -07:00
parent f349f85b05
commit 706ae07d0d
7 changed files with 369 additions and 53 deletions

View File

@@ -24,6 +24,7 @@ import (
"io"
"io/ioutil"
"os"
"sort"
"sync"
"time"
@@ -32,6 +33,7 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@@ -45,6 +47,7 @@ import (
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/plugin"
"github.com/heptio/ark/pkg/restore"
"github.com/heptio/ark/pkg/util/boolptr"
"github.com/heptio/ark/pkg/util/collections"
kubeutil "github.com/heptio/ark/pkg/util/kube"
)
@@ -252,15 +255,8 @@ func (controller *restoreController) processRestore(key string) error {
// don't modify items in the cache
restore = restore.DeepCopy()
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
for _, nonrestorable := range nonRestorableResources {
if !excludedResources.Has(nonrestorable) {
restore.Spec.ExcludedResources = append(restore.Spec.ExcludedResources, nonrestorable)
}
}
// validation
if restore.Status.ValidationErrors = controller.getValidationErrors(restore); len(restore.Status.ValidationErrors) > 0 {
// complete & validate restore
if restore.Status.ValidationErrors = controller.completeAndValidate(restore); len(restore.Status.ValidationErrors) > 0 {
restore.Status.Phase = api.RestorePhaseFailedValidation
} else {
restore.Status.Phase = api.RestorePhaseInProgress
@@ -304,37 +300,114 @@ func (controller *restoreController) processRestore(key string) error {
return nil
}
func (controller *restoreController) getValidationErrors(itm *api.Restore) []string {
var validationErrors []string
if itm.Spec.BackupName == "" {
validationErrors = append(validationErrors, "BackupName must be non-empty and correspond to the name of a backup in object storage.")
} else if _, err := controller.fetchBackup(controller.bucket, itm.Spec.BackupName); err != nil {
validationErrors = append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
func (controller *restoreController) completeAndValidate(restore *api.Restore) []string {
// add non-restorable resources to restore's excluded resources
excludedResources := sets.NewString(restore.Spec.ExcludedResources...)
for _, nonrestorable := range nonRestorableResources {
if !excludedResources.Has(nonrestorable) {
restore.Spec.ExcludedResources = append(restore.Spec.ExcludedResources, nonrestorable)
}
}
includedResources := sets.NewString(itm.Spec.IncludedResources...)
var validationErrors []string
// validate that included resources don't contain any non-restorable resources
includedResources := sets.NewString(restore.Spec.IncludedResources...)
for _, nonRestorableResource := range nonRestorableResources {
if includedResources.Has(nonRestorableResource) {
validationErrors = append(validationErrors, fmt.Sprintf("%v are non-restorable resources", nonRestorableResource))
}
}
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedNamespaces, itm.Spec.ExcludedNamespaces) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
}
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) {
// validate included/excluded resources
for _, err := range collections.ValidateIncludesExcludes(restore.Spec.IncludedResources, restore.Spec.ExcludedResources) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
}
if !controller.pvProviderExists && itm.Spec.RestorePVs != nil && *itm.Spec.RestorePVs {
// validate included/excluded namespaces
for _, err := range collections.ValidateIncludesExcludes(restore.Spec.IncludedNamespaces, restore.Spec.ExcludedNamespaces) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
}
// validate that PV provider exists if we're restoring PVs
if boolptr.IsSetToTrue(restore.Spec.RestorePVs) && !controller.pvProviderExists {
validationErrors = append(validationErrors, "Server is not configured for PV snapshot restores")
}
// validate that exactly one of BackupName and ScheduleName have been specified
if !backupXorScheduleProvided(restore) {
return append(validationErrors, "Either a backup or schedule must be specified as a source for the restore, but not both")
}
// if ScheduleName is specified, fill in BackupName with the most recent successful backup from
// the schedule
if restore.Spec.ScheduleName != "" {
selector := labels.SelectorFromSet(labels.Set(map[string]string{
"ark-schedule": restore.Spec.ScheduleName,
}))
backups, err := controller.backupLister.Backups(controller.namespace).List(selector)
if err != nil {
return append(validationErrors, "Unable to list backups for schedule")
}
if len(backups) == 0 {
return append(validationErrors, "No backups found for schedule")
}
if backup := mostRecentCompletedBackup(backups); backup != nil {
restore.Spec.BackupName = backup.Name
} else {
return append(validationErrors, "No completed backups found for schedule")
}
}
// validate that we can fetch the source backup
if _, err := controller.fetchBackup(controller.bucket, restore.Spec.BackupName); err != nil {
return append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err))
}
return validationErrors
}
// backupXorScheduleProvided returns true if exactly one of BackupName and
// ScheduleName are non-empty for the restore, or false otherwise.
func backupXorScheduleProvided(restore *api.Restore) bool {
if restore.Spec.BackupName != "" && restore.Spec.ScheduleName != "" {
return false
}
if restore.Spec.BackupName == "" && restore.Spec.ScheduleName == "" {
return false
}
return true
}
// mostRecentCompletedBackup returns the most recent backup that's
// completed from a list of backups. Since the backups are expected
// to be from a single schedule, "most recent" is defined as first
// when sorted in reverse alphabetical order by name.
func mostRecentCompletedBackup(backups []*api.Backup) *api.Backup {
sort.Slice(backups, func(i, j int) bool {
// Use '>' because we want descending sort.
// Using Name rather than CreationTimestamp because in the case of
// backups synced into a new cluster, the CreationTimestamp value is
// time of creation in the new cluster rather than time of backup.
// TODO would be useful to have a new API field in backup.status
// that captures the time of backup as a time value (particularly
// for non-scheduled backups).
return backups[i].Name > backups[j].Name
})
for _, backup := range backups {
if backup.Status.Phase == api.BackupPhaseCompleted {
return backup
}
}
return nil
}
func (controller *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) {
backup, err := controller.backupLister.Backups(controller.namespace).Get(name)
if err == nil {