Restore progress reporting bug fix (#3583)

* Improve readbility and formatting of pkg/restore/restore.go

Signed-off-by: F. Gold <fgold@vmware.com>

* Update paths to include API group versions

Signed-off-by: F. Gold <fgold@vmware.com>

* Use full word, 'resource' instead of 'resrc'

Signed-off-by: F. Gold <fgold@vmware.com>
This commit is contained in:
codegold79
2021-03-15 15:51:07 -07:00
committed by GitHub
parent 70287f00f9
commit c8dfd648bb

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"io/ioutil"
"path/filepath"
"sort"
"strings"
"sync"
@@ -175,10 +176,14 @@ func (kr *kubernetesRestorer) Restore(
return Result{}, Result{Velero: []string{err.Error()}}
}
// get resource includes-excludes
resourceIncludesExcludes := collections.GetResourceIncludesExcludes(kr.discoveryHelper, req.Restore.Spec.IncludedResources, req.Restore.Spec.ExcludedResources)
// Get resource includes-excludes.
resourceIncludesExcludes := collections.GetResourceIncludesExcludes(
kr.discoveryHelper,
req.Restore.Spec.IncludedResources,
req.Restore.Spec.ExcludedResources,
)
// get namespace includes-excludes
// Get namespace includes-excludes.
namespaceIncludesExcludes := collections.NewIncludesExcludes().
Includes(req.Restore.Spec.IncludedNamespaces...).
Excludes(req.Restore.Spec.ExcludedNamespaces...)
@@ -192,7 +197,10 @@ func (kr *kubernetesRestorer) Restore(
if val := req.Restore.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
if err != nil {
req.Log.WithError(errors.WithStack(err)).Errorf("Unable to parse pod volume timeout annotation %s, using server value.", val)
req.Log.WithError(errors.WithStack(err)).Errorf(
"Unable to parse pod volume timeout annotation %s, using server value.",
val,
)
} else {
podVolumeTimeout = parsed
}
@@ -352,9 +360,10 @@ type resourceClientKey struct {
namespace string
}
// getOrderedResources returns an ordered list of resource identifiers to restore, based on the provided resource
// priorities and backup contents. The returned list begins with all of the prioritized resources (in order), and
// appends to that an alphabetized list of all resources in the backup.
// getOrderedResources returns an ordered list of resource identifiers to restore,
// based on the provided resource priorities and backup contents. The returned list
// begins with all of the prioritized resources (in order), and appends to that
// an alphabetized list of all resources in the backup.
func getOrderedResources(resourcePriorities []string, backupResources map[string]*archive.ResourceItems) []string {
// alphabetize resources in the backup
orderedBackupResources := make([]string, 0, len(backupResources))
@@ -363,7 +372,8 @@ func getOrderedResources(resourcePriorities []string, backupResources map[string
}
sort.Strings(orderedBackupResources)
// main list: everything in resource priorities, followed by what's in the backup (alphabetized)
// Main list: everything in resource priorities, followed by what's in the
// backup (alphabetized).
return append(resourcePriorities, orderedBackupResources...)
}
@@ -380,7 +390,7 @@ func (ctx *restoreContext) execute() (Result, Result) {
}
defer ctx.fileSystem.RemoveAll(dir)
// need to set this for additionalItems to be restored
// Need to set this for additionalItems to be restored.
ctx.restoreDir = dir
backupResources, err := archive.NewParser(ctx.log, ctx.fileSystem).Parse(ctx.restoreDir)
@@ -423,9 +433,21 @@ func (ctx *restoreContext) execute() (Result, Result) {
lastUpdate = &val
case <-ticker.C:
if lastUpdate != nil {
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`, lastUpdate.totalItems, lastUpdate.itemsRestored)
if _, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(go_context.TODO(), ctx.restore.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
ctx.log.WithError(errors.WithStack((err))).Warn("Got error trying to update restore's status.progress")
patch := fmt.Sprintf(
`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`,
lastUpdate.totalItems,
lastUpdate.itemsRestored,
)
_, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(
go_context.TODO(),
ctx.restore.Name,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
if err != nil {
ctx.log.WithError(errors.WithStack((err))).
Warn("Got error trying to update restore's status.progress")
}
lastUpdate = nil
}
@@ -433,85 +455,120 @@ func (ctx *restoreContext) execute() (Result, Result) {
}
}()
// i: iteration counter, totalItems: previously discovered items,
// totalItems: previously discovered items, i: iteration counter.
totalItems, i, existingNamespaces := 0, 0, sets.NewString()
for _, selectedResource := range selectedResourceCollection {
totalItems += selectedResource.totalItems
}
for _, selectedResource := range selectedResourceCollection {
groupResource := schema.ParseGroupResource(selectedResource.resource)
for namespace, selectedItems := range selectedResource.selectedItemsByNamespace {
for _, selectedItem := range selectedItems {
// if we don't know whether this namespace exists yet, attempt to create
// If we don't know whether this namespace exists yet, attempt to create
// it in order to ensure it exists. Try to get it from the backup tarball
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) {
logger := ctx.log.WithField("namespace", namespace)
ns := getNamespace(logger, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace), selectedItem.targetNamespace)
if _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
ns := getNamespace(
logger,
archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace),
selectedItem.targetNamespace,
)
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(
ns,
ctx.namespaceClient,
ctx.resourceTerminatingTimeout,
)
if err != nil {
errs.AddVeleroError(err)
continue
} else {
// add the newly created namespace to the list of restored items
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
Namespace: ns.Namespace,
Name: ns.Name,
}
ctx.restoredItems[itemKey] = struct{}{}
}
}
// keep track of namespaces that we know exist so we don't
// have to try to create them multiple times
// Add the newly created namespace to the list of restored items.
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
Namespace: ns.Namespace,
Name: ns.Name,
}
ctx.restoredItems[itemKey] = struct{}{}
}
// Keep track of namespaces that we know exist so we don't
// have to try to create them multiple times.
existingNamespaces.Insert(selectedItem.targetNamespace)
}
obj, err := archive.Unmarshal(ctx.fileSystem, selectedItem.path)
if err != nil {
errs.Add(selectedItem.targetNamespace, fmt.Errorf("error decoding %q: %v", strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1), err))
errs.Add(
selectedItem.targetNamespace,
fmt.Errorf(
"error decoding %q: %v",
strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1),
err,
),
)
continue
}
w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace)
warnings.Merge(&w)
errs.Merge(&e)
i++
// totalItems keeps the count of items previously known
// there may be additional items restored by plugins
// we want to include the additional items by looking at restoredItems
// at the same time, we don't want previously known items counted twice
// as they are present in both restoredItems and totalItems
// totalItems keeps the count of items previously known. There
// may be additional items restored by plugins. We want to include
// the additional items by looking at restoredItems at the same
// time, we don't want previously known items counted twice as
// they are present in both restoredItems and totalItems.
actualTotalItems := len(ctx.restoredItems) + (totalItems - i)
update <- progressUpdate{
totalItems: actualTotalItems,
itemsRestored: len(ctx.restoredItems),
}
warnings.Merge(&w)
errs.Merge(&e)
}
}
// if we just restored custom resource definitions (CRDs), refresh discovery
// because the restored CRDs may have created new APIs that didn't previously
// exist in the cluster, and we want to be able to resolve & restore instances
// of them in subsequent loop iterations.
// If we just restored custom resource definitions (CRDs), refresh
// discovery because the restored CRDs may have created new APIs that
// didn't previously exist in the cluster, and we want to be able to
// resolve & restore instances of them in subsequent loop iterations.
if groupResource == kuberesource.CustomResourceDefinitions {
if err := ctx.discoveryHelper.Refresh(); err != nil {
warnings.Add("", errors.Wrap(err, "error refreshing discovery after restoring custom resource definitions"))
warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs"))
}
}
}
// close the progress update channel
// Close the progress update channel.
quit <- struct{}{}
// do a final progress update as stopping the ticker might have left last few updates from taking place
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`, len(ctx.restoredItems), len(ctx.restoredItems))
if _, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(go_context.TODO(), ctx.restore.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
ctx.log.WithError(errors.WithStack((err))).Warn("Got error trying to update restore's status.progress")
// Do a final progress update as stopping the ticker might have left last few
// updates from taking place.
patch := fmt.Sprintf(
`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`,
len(ctx.restoredItems),
len(ctx.restoredItems),
)
_, err = ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(
go_context.TODO(),
ctx.restore.Name,
types.MergePatchType,
[]byte(patch),
metav1.PatchOptions{},
)
if err != nil {
ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status.progress")
}
// wait for all of the restic restore goroutines to be done, which is
// Wait for all of the restic restore goroutines to be done, which is
// only possible once all of their errors have been received by the loop
// below, then close the resticErrs channel so the loop terminates.
go func() {
@@ -522,18 +579,18 @@ func (ctx *restoreContext) execute() (Result, Result) {
close(ctx.resticErrs)
}()
// this loop will only terminate when the ctx.resticErrs channel is closed
// This loop will only terminate when the ctx.resticErrs channel is closed
// in the above goroutine, *after* all errors from the goroutines have been
// received by this loop.
for err := range ctx.resticErrs {
// TODO not ideal to be adding these to Velero-level errors
// TODO: not ideal to be adding these to Velero-level errors
// rather than a specific namespace, but don't have a way
// to track the namespace right now.
errs.Velero = append(errs.Velero, err.Error())
}
ctx.log.Info("Done waiting for all restic restores to complete")
// wait for all post-restore exec hooks with same logic as restic wait above
// Wait for all post-restore exec hooks with same logic as restic wait above.
go func() {
ctx.log.Info("Waiting for all post-restore-exec hooks to complete")
@@ -670,11 +727,12 @@ func (ctx *restoreContext) shouldRestore(name string, pvClient client.Dynamic) (
return false, nil
}
// Check the namespace associated with the claimRef to see if it's deleting/terminating before proceeding
// Check the namespace associated with the claimRef to see if it's
// deleting/terminating before proceeding.
ns, err := ctx.namespaceClient.Get(go_context.TODO(), namespace, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
pvLogger.Debugf("namespace %s for PV not found, waiting", namespace)
// namespace not found but the PV still exists, so continue to wait
// Namespace not found but the PV still exists, so continue to wait.
return false, nil
}
if err != nil {
@@ -683,7 +741,7 @@ func (ctx *restoreContext) shouldRestore(name string, pvClient client.Dynamic) (
if ns != nil && (ns.GetDeletionTimestamp() != nil || ns.Status.Phase == v1.NamespaceTerminating) {
pvLogger.Debugf("namespace %s associated with PV is deleting, waiting", namespace)
// namespace is in the process of deleting, keep looping
// Namespace is in the process of deleting, keep looping.
return false, nil
}
@@ -699,22 +757,25 @@ func (ctx *restoreContext) shouldRestore(name string, pvClient client.Dynamic) (
return shouldRestore, err
}
// crdAvailable waits for a CRD to be available for use before letting the restore continue.
// crdAvailable waits for a CRD to be available for use before letting the
// restore continue.
func (ctx *restoreContext) crdAvailable(name string, crdClient client.Dynamic) (bool, error) {
crdLogger := ctx.log.WithField("crdName", name)
var available bool
// Wait 1 minute rather than the standard resource timeout, since each CRD will transition fairly quickly
// Wait 1 minute rather than the standard resource timeout, since each CRD
// will transition fairly quickly.
err := wait.PollImmediate(time.Second, time.Minute*1, func() (bool, error) {
unstructuredCRD, err := crdClient.Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
// TODO: Due to upstream conversion issues in runtime.FromUnstructured, we use the unstructured object here.
// Once the upstream conversion functions are fixed, we should convert to the CRD types and use IsCRDReady
// TODO: Due to upstream conversion issues in runtime.FromUnstructured,
// we use the unstructured object here. Once the upstream conversion
// functions are fixed, we should convert to the CRD types and use
// IsCRDReady.
available, err = kube.IsUnstructuredCRDReady(unstructuredCRD)
if err != nil {
return true, err
}
@@ -723,8 +784,8 @@ func (ctx *restoreContext) crdAvailable(name string, crdClient client.Dynamic) (
crdLogger.Debug("CRD not yet ready for use")
}
// If the CRD is not available, keep polling (false, nil)
// If the CRD is available, break the poll and return back to caller (true, nil)
// If the CRD is not available, keep polling (false, nil).
// If the CRD is available, break the poll and return to caller (true, nil).
return available, nil
})
@@ -745,8 +806,8 @@ func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource,
return client, nil
}
// initialize client for this Resource. we need
// metadata from an object to do this.
// Initialize client for this resource. We need metadata from an object to
// do this.
ctx.log.Infof("Getting client for %v", obj.GroupVersionKind())
resource := metav1.APIResource{
@@ -803,15 +864,15 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs
}
// if the namespace scoped resource should be restored, ensure that the namespace into
// which the resource is being restored into exists.
// If the namespace scoped resource should be restored, ensure that the
// namespace into which the resource is being restored into exists.
// This is the *remapped* namespace that we are ensuring exists.
nsToEnsure := getNamespace(ctx.log, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", obj.GetNamespace()), namespace)
if _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
errs.AddVeleroError(err)
return warnings, errs
} else {
// add the newly created namespace to the list of restored items
// Add the newly created namespace to the list of restored items.
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
@@ -832,8 +893,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
}
// make a copy of object retrieved from backup
// to make it available unchanged inside restore actions
// Make a copy of object retrieved from backup to make it available unchanged
//inside restore actions.
itemFromBackup := obj.DeepCopy()
complete, err := isCompleted(obj, groupResource)
@@ -848,7 +909,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
name := obj.GetName()
// Check if we've already restored this
// Check if we've already restored this itemKey.
itemKey := velero.ResourceIdentifier{
GroupResource: groupResource,
Namespace: namespace,
@@ -860,7 +921,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
ctx.restoredItems[itemKey] = struct{}{}
// TODO: move to restore item action if/when we add a ShouldRestore() method to the interface
// TODO: move to restore item action if/when we add a ShouldRestore() method
// to the interface.
if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" {
ctx.log.Infof("Not restoring pod because it's a mirror pod")
return warnings, errs
@@ -882,7 +944,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs
}
// Check to see if the claimRef.namespace field needs to be remapped, and do so if necessary.
// Check to see if the claimRef.namespace field needs to be remapped,
// and do so if necessary.
_, err = remapClaimRefNS(ctx, obj)
if err != nil {
errs.Add(namespace, err)
@@ -899,17 +962,18 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs
}
} else {
// if we're renaming the PV, we're going to give it a new random name,
// If we're renaming the PV, we're going to give it a new random name,
// so we can assume it doesn't already exist in the cluster and therefore
// we should proceed with restoring from snapshot.
shouldRestoreSnapshot = true
}
if shouldRestoreSnapshot {
// reset the PV's binding status so that Kubernetes can properly associate it with the restored PVC.
// Reset the PV's binding status so that Kubernetes can properly
// associate it with the restored PVC.
obj = resetVolumeBindingInfo(obj)
// even if we're renaming the PV, obj still has the old name here, because the pvRestorer
// Even if we're renaming the PV, obj still has the old name here, because the pvRestorer
// uses the original name to look up metadata about the snapshot.
ctx.log.Infof("Restoring persistent volume from snapshot.")
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
@@ -919,7 +983,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
obj = updatedObj
// VolumeSnapshotter has modified the PV name, we should rename the PV
// VolumeSnapshotter has modified the PV name, we should rename the PV.
if oldName != obj.GetName() {
shouldRenamePV = true
}
@@ -928,21 +992,22 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if shouldRenamePV {
var pvName string
if oldName == obj.GetName() {
// pvRestorer hasn't modified the PV name, we need to rename the PV
// pvRestorer hasn't modified the PV name, we need to rename the PV.
pvName, err = ctx.pvRenamer(oldName)
if err != nil {
errs.Add(namespace, errors.Wrapf(err, "error renaming PV"))
return warnings, errs
}
} else {
// VolumeSnapshotter could have modified the PV name through function `SetVolumeID`,
// VolumeSnapshotter could have modified the PV name through
// function `SetVolumeID`,
pvName = obj.GetName()
}
ctx.renamedPVs[oldName] = pvName
obj.SetName(pvName)
// add the original PV name as an annotation
// Add the original PV name as an annotation.
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
@@ -955,22 +1020,24 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a restic backup to be restored.")
ctx.pvsToProvision.Insert(name)
// return early because we don't want to restore the PV itself, we want to dynamically re-provision it.
// Return early because we don't want to restore the PV itself, we
// want to dynamically re-provision it.
return warnings, errs
case hasDeleteReclaimPolicy(obj.Object):
ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.pvsToProvision.Insert(name)
// return early because we don't want to restore the PV itself, we want to dynamically re-provision it.
// Return early because we don't want to restore the PV itself, we
// want to dynamically re-provision it.
return warnings, errs
default:
ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.")
obj = resetVolumeBindingInfo(obj)
// we call the pvRestorer here to clear out the PV's claimRef.UID, so it can be re-claimed
// when its PVC is restored and gets a new UID.
// We call the pvRestorer here to clear out the PV's claimRef.UID,
// so it can be re-claimed when its PVC is restored and gets a new UID.
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
if err != nil {
errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err))
@@ -980,7 +1047,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
}
// clear out non-core metadata fields & status
// Clear out non-core metadata fields and status.
if obj, err = resetMetadataAndStatus(obj); err != nil {
errs.Add(namespace, err)
return warnings, errs
@@ -1084,16 +1151,16 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
}
// necessary because we may have remapped the namespace
// if the namespace is blank, don't create the key
// Necessary because we may have remapped the namespace if the namespace is
// blank, don't create the key.
originalNamespace := obj.GetNamespace()
if namespace != "" {
obj.SetNamespace(namespace)
}
// label the resource with the restore's name and the restored backup's name
// Label the resource with the restore's name and the restored backup's name
// for easy identification of all cluster resources created by this restore
// and which backup they came from
// and which backup they came from.
addRestoreLabels(obj, ctx.restore.Name, ctx.restore.Spec.BackupName)
ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name)
@@ -1105,7 +1172,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
warnings.Add(namespace, err)
return warnings, errs
}
// Remove insubstantial metadata
// Remove insubstantial metadata.
fromCluster, err = resetMetadataAndStatus(fromCluster)
if err != nil {
ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
@@ -1113,8 +1180,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs
}
// We know the object from the cluster won't have the backup/restore name labels, so
// copy them from the object we attempted to restore.
// We know the object from the cluster won't have the backup/restore name
// labels, so copy them from the object we attempted to restore.
labels := obj.GetLabels()
addRestoreLabels(fromCluster, labels[velerov1api.RestoreNameLabel], labels[velerov1api.BackupNameLabel])
@@ -1136,7 +1203,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
if patchBytes == nil {
// In-cluster and desired state are the same, so move on to the next item
// In-cluster and desired state are the same, so move on to
// the next item.
return warnings, errs
}
@@ -1157,7 +1225,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs
}
// Error was something other than an AlreadyExists
// Error was something other than an AlreadyExists.
if restoreErr != nil {
ctx.log.Errorf("error restoring %s: %+v", name, restoreErr)
errs.Add(namespace, fmt.Errorf("error restoring %s: %v", resourceID, restoreErr))
@@ -1186,11 +1254,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs
}
// shouldRenamePV returns a boolean indicating whether a persistent volume should be given a new name
// before being restored, or an error if this cannot be determined. A persistent volume will be
// given a new name if and only if (a) a PV with the original name already exists in-cluster, and
// (b) in the backup, the PV is claimed by a PVC in a namespace that's being remapped during the
// restore.
// shouldRenamePV returns a boolean indicating whether a persistent volume should
// be given a new name before being restored, or an error if this cannot be determined.
// A persistent volume will be given a new name if and only if (a) a PV with the
// original name already exists in-cluster, and (b) in the backup, the PV is claimed
// by a PVC in a namespace that's being remapped during the restore.
func shouldRenamePV(ctx *restoreContext, obj *unstructured.Unstructured, client client.Dynamic) (bool, error) {
if len(ctx.restore.Spec.NamespaceMapping) == 0 {
ctx.log.Debugf("Persistent volume does not need to be renamed because restore is not remapping any namespaces")
@@ -1221,19 +1289,21 @@ func shouldRenamePV(ctx *restoreContext, obj *unstructured.Unstructured, client
return false, errors.Wrapf(err, "error checking if persistent volume exists in the cluster")
}
// no error returned: the PV was found in-cluster, so we need to rename it
// No error returned: the PV was found in-cluster, so we need to rename it.
return true, nil
}
// remapClaimRefNS remaps a PersistentVolume's claimRef.Namespace based on a restore's NamespaceMappings, if necessary.
// Returns true if the namespace was remapped, false if it was not required.
// remapClaimRefNS remaps a PersistentVolume's claimRef.Namespace based on a
// restore's NamespaceMappings, if necessary. Returns true if the namespace was
// remapped, false if it was not required.
func remapClaimRefNS(ctx *restoreContext, obj *unstructured.Unstructured) (bool, error) {
if len(ctx.restore.Spec.NamespaceMapping) == 0 {
ctx.log.Debug("Persistent volume does not need to have the claimRef.namespace remapped because restore is not remapping any namespaces")
return false, nil
}
// Conversion to the real type here is more readable than all the error checking involved with reading each field individually.
// Conversion to the real type here is more readable than all the error checking
// involved with reading each field individually.
pv := new(v1.PersistentVolume)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, pv); err != nil {
return false, errors.Wrapf(err, "error converting persistent volume to structured")
@@ -1266,8 +1336,8 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr
} else {
ctx.resticWaitGroup.Add(1)
go func() {
// Done() will only be called after all errors have been successfully sent
// on the ctx.resticErrs channel
// Done() will only be called after all errors have been successfully
// sent on the ctx.resticErrs channel
defer ctx.resticWaitGroup.Done()
pod := new(v1.Pod)
@@ -1295,12 +1365,12 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr
}
}
// waitExec executes hooks in a restored pod's containers when they become ready
// waitExec executes hooks in a restored pod's containers when they become ready.
func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) {
ctx.hooksWaitGroup.Add(1)
go func() {
// Done() will only be called after all errors have been successfully sent
// on the ctx.resticErrs channel
// on the ctx.resticErrs channel.
defer ctx.hooksWaitGroup.Done()
pod := new(v1.Pod)
@@ -1325,7 +1395,7 @@ func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) {
ctx.hooksCancelFunc()
for _, err := range errs {
// Errors are already logged in the HandleHooks method
// Errors are already logged in the HandleHooks method.
ctx.hooksErrs <- err
}
}
@@ -1373,24 +1443,31 @@ func hasDeleteReclaimPolicy(obj map[string]interface{}) bool {
return policy == string(v1.PersistentVolumeReclaimDelete)
}
// resetVolumeBindingInfo clears any necessary metadata out of a PersistentVolume or PersistentVolumeClaim that would make it ineligible to be re-bound by Velero.
// resetVolumeBindingInfo clears any necessary metadata out of a PersistentVolume
// or PersistentVolumeClaim that would make it ineligible to be re-bound by Velero.
func resetVolumeBindingInfo(obj *unstructured.Unstructured) *unstructured.Unstructured {
// Clean out ClaimRef UID and resourceVersion, since this information is highly unique.
// Clean out ClaimRef UID and resourceVersion, since this information is
// highly unique.
unstructured.RemoveNestedField(obj.Object, "spec", "claimRef", "uid")
unstructured.RemoveNestedField(obj.Object, "spec", "claimRef", "resourceVersion")
// Clear out any annotations used by the Kubernetes PV controllers to track bindings.
// Clear out any annotations used by the Kubernetes PV controllers to track
// bindings.
annotations := obj.GetAnnotations()
// Upon restore, this new PV will look like a statically provisioned, manually-bound volume rather than one bound by the controller, so remove the annotation that signals that a controller bound it.
// Upon restore, this new PV will look like a statically provisioned, manually-
// bound volume rather than one bound by the controller, so remove the annotation
// that signals that a controller bound it.
delete(annotations, KubeAnnBindCompleted)
// Remove the annotation that signals that the PV is already bound; we want the PV(C) controller to take the two objects and bind them again.
// Remove the annotation that signals that the PV is already bound; we want
// the PV(C) controller to take the two objects and bind them again.
delete(annotations, KubeAnnBoundByController)
// Remove the provisioned-by annotation which signals that the persistent volume was dynamically provisioned; it is now statically provisioned.
// Remove the provisioned-by annotation which signals that the persistent
// volume was dynamically provisioned; it is now statically provisioned.
delete(annotations, KubeAnnDynamicallyProvisioned)
// GetAnnotations returns a copy, so we have to set them again
// GetAnnotations returns a copy, so we have to set them again.
obj.SetAnnotations(annotations)
return obj
@@ -1420,8 +1497,8 @@ func resetMetadataAndStatus(obj *unstructured.Unstructured) (*unstructured.Unstr
return obj, nil
}
// addRestoreLabels labels the provided object with the restore name and
// the restored backup's name.
// addRestoreLabels labels the provided object with the restore name and the
// restored backup's name.
func addRestoreLabels(obj metav1.Object, restoreName, backupName string) {
labels := obj.GetLabels()
@@ -1435,8 +1512,9 @@ func addRestoreLabels(obj metav1.Object, restoreName, backupName string) {
obj.SetLabels(labels)
}
// isCompleted returns whether or not an object is considered completed.
// Used to identify whether or not an object should be restored. Only Jobs or Pods are considered
// isCompleted returns whether or not an object is considered completed. Used to
// identify whether or not an object should be restored. Only Jobs or Pods are
// considered.
func isCompleted(obj *unstructured.Unstructured, groupResource schema.GroupResource) (bool, error) {
switch groupResource {
case kuberesource.Pods:
@@ -1457,42 +1535,48 @@ func isCompleted(obj *unstructured.Unstructured, groupResource schema.GroupResou
return true, nil
}
}
// Assume any other resource isn't complete and can be restored
// Assume any other resource isn't complete and can be restored.
return false, nil
}
// restoreableResource represents map of individual items of
// each resource identifier grouped by their original namespaces
// restoreableResource represents map of individual items of each resource
// identifier grouped by their original namespaces.
type restoreableResource struct {
resource string
selectedItemsByNamespace map[string][]restoreableItem
totalItems int
}
// restoreableItem represents an item by its target namespace
// contains enough information required to restore the item
// restoreableItem represents an item by its target namespace contains enough
// information required to restore the item.
type restoreableItem struct {
path string
targetNamespace string
name string
}
// getOrderedResourceCollection iterates over list of ordered resource idenitifiers, applies resource include/exclude criteria,
// and Kubernetes selectors to make a list of resources to be actually restored preserving the original order
// getOrderedResourceCollection iterates over list of ordered resource
// identifiers, applies resource include/exclude criteria, and Kubernetes
// selectors to make a list of resources to be actually restored preserving the
// original order.
func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[string]*archive.ResourceItems) ([]restoreableResource, Result, Result) {
var warnings, errs Result
processedResources := sets.NewString()
restoreResourceCollection := make([]restoreableResource, 0)
// Iterate through an ordered list of resources to restore, checking each one to see if it should be restored.
// Note that resources *may* be in this list twice, i.e. once due to being a prioritized resource, and once due
// to being in the backup tarball. We can't de-dupe this upfront, because it's possible that items in the prioritized
// resources list may not be fully resolved group-resource strings (e.g. may be specified as "po" instead of "pods"),
// and we don't want to fully resolve them via discovery until we reach them in the loop, because it is possible
// that the resource/API itself is being restored via a custom resource definition, meaning it's not available via
// discovery prior to beginning the restore.
// Iterate through an ordered list of resources to restore, checking each
// one to see if it should be restored. Note that resources *may* be in this
// list twice, i.e. once due to being a prioritized resource, and once due
// to being in the backup tarball. We can't de-dupe this upfront, because
// it's possible that items in the prioritized resources list may not be
// fully resolved group-resource strings (e.g. may be specified as "po"
// instead of "pods"), and we don't want to fully resolve them via discovery
// until we reach them in the loop, because it is possible that the
// resource/API itself is being restored via a custom resource definition,
// meaning it's not available via discovery prior to beginning the restore.
//
// Since we keep track of the fully-resolved group-resources that we *have* restored, we won't try to restore a
// resource twice even if it's in the ordered list twice.
// Since we keep track of the fully-resolved group-resources that we *have*
// restored, we won't try to restore a resource twice even if it's in the
// ordered list twice.
for _, resource := range getOrderedResources(ctx.resourcePriorities, backupResources) {
// try to resolve the resource via discovery to a complete group/version/resource
gvr, _, err := ctx.discoveryHelper.ResourceFor(schema.ParseGroupResource(resource).WithVersion(""))
@@ -1502,35 +1586,37 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
}
groupResource := gvr.GroupResource()
// check if we've already restored this resource (this would happen if the resource
// we're currently looking at was already restored because it was a prioritized
// resource, and now we're looking at it as part of the backup contents).
// Check if we've already restored this resource (this would happen if
// the resource we're currently looking at was already restored because
// it was a prioritized resource, and now we're looking at it as part of
// the backup contents).
if processedResources.Has(groupResource.String()) {
ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's already been processed")
continue
}
// check if the resource should be restored according to the resource includes/excludes
// Check if the resource should be restored according to the resource
// includes/excludes.
if !ctx.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
ctx.log.WithField("resource", groupResource.String()).Infof("Skipping restore of resource because the restore spec excludes it")
continue
}
// we don't want to explicitly restore namespace API objs because we'll handle
// We don't want to explicitly restore namespace API objs because we'll handle
// them as a special case prior to restoring anything into them
if groupResource == kuberesource.Namespaces {
continue
}
// check if the resource is present in the backup
// Check if the resource is present in the backup
resourceList := backupResources[groupResource.String()]
if resourceList == nil {
ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's not present in the backup tarball")
continue
}
// iterate through each namespace that contains instances of the resource and
// append to the list of to-be restored resources
// Iterate through each namespace that contains instances of the
// resource and append to the list of to-be restored resources.
for namespace, items := range resourceList.ItemsByNamespace {
if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
ctx.log.Infof("Skipping namespace %s", namespace)
@@ -1555,10 +1641,10 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
}
res, w, e := ctx.getSelectedRestoreableItems(groupResource.String(), targetNamespace, namespace, items)
restoreResourceCollection = append(restoreResourceCollection, res)
warnings.Merge(&w)
errs.Merge(&e)
restoreResourceCollection = append(restoreResourceCollection, res)
}
// record that we've restored the resource
@@ -1567,14 +1653,18 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
return restoreResourceCollection, warnings, errs
}
// getSelectedRestoreableItems applies Kubernetes selectors on individual items of each resource type to create
// a list of items which will be actually restored
// getSelectedRestoreableItems applies Kubernetes selectors on individual items
// of each resource type to create a list of items which will be actually
// restored.
func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace, originalNamespace string, items []string) (restoreableResource, Result, Result) {
var res restoreableResource
warnings, errs := Result{}, Result{}
res.resource = resource
if res.selectedItemsByNamespace == nil {
res.selectedItemsByNamespace = make(map[string][]restoreableItem)
restorable := restoreableResource{
resource: resource,
}
if restorable.selectedItemsByNamespace == nil {
restorable.selectedItemsByNamespace = make(map[string][]restoreableItem)
}
if targetNamespace != "" {
@@ -1583,12 +1673,33 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace
ctx.log.Infof("Resource '%s' will be restored at cluster scope", resource)
}
// If the APIGroupVersionsFeatureFlag is enabled, the item path will be
// updated to include the API group version that was chosen for restore. For
// example, for "horizontalpodautoscalers.autoscaling", if v2beta1 is chosen
// to be restored, then "horizontalpodautoscalers.autoscaling/v2beta1" will
// be part of item path. Different versions would only have been stored
// if the APIGroupVersionsFeatureFlag was enabled during backup. The
// chosenGrpVersToRestore map would only be populated if
// APIGroupVersionsFeatureFlag was enabled for restore and the minimum
// required backup format version has been met.
cgv, ok := ctx.chosenGrpVersToRestore[resource]
if ok {
resource = filepath.Join(resource, cgv.Dir)
}
for _, item := range items {
itemPath := archive.GetItemFilePath(ctx.restoreDir, resource, originalNamespace, item)
obj, err := archive.Unmarshal(ctx.fileSystem, itemPath)
if err != nil {
errs.Add(targetNamespace, fmt.Errorf("error decoding %q: %v", strings.Replace(itemPath, ctx.restoreDir+"/", "", -1), err))
errs.Add(
targetNamespace,
fmt.Errorf(
"error decoding %q: %v",
strings.Replace(itemPath, ctx.restoreDir+"/", "", -1),
err,
),
)
continue
}
@@ -1601,8 +1712,9 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace
name: item,
targetNamespace: targetNamespace,
}
res.selectedItemsByNamespace[originalNamespace] = append(res.selectedItemsByNamespace[originalNamespace], selectedItem)
res.totalItems++
restorable.selectedItemsByNamespace[originalNamespace] =
append(restorable.selectedItemsByNamespace[originalNamespace], selectedItem)
restorable.totalItems++
}
return res, warnings, errs
return restorable, warnings, errs
}