From c8dfd648bbe85db0184ea53296de4220895497e6 Mon Sep 17 00:00:00 2001 From: codegold79 <17328443+codegold79@users.noreply.github.com> Date: Mon, 15 Mar 2021 15:51:07 -0700 Subject: [PATCH] Restore progress reporting bug fix (#3583) * Improve readbility and formatting of pkg/restore/restore.go Signed-off-by: F. Gold * Update paths to include API group versions Signed-off-by: F. Gold * Use full word, 'resource' instead of 'resrc' Signed-off-by: F. Gold --- pkg/restore/restore.go | 416 ++++++++++++++++++++++++++--------------- 1 file changed, 264 insertions(+), 152 deletions(-) diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index d8ea0b2d0..4817dca38 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "io/ioutil" + "path/filepath" "sort" "strings" "sync" @@ -175,10 +176,14 @@ func (kr *kubernetesRestorer) Restore( return Result{}, Result{Velero: []string{err.Error()}} } - // get resource includes-excludes - resourceIncludesExcludes := collections.GetResourceIncludesExcludes(kr.discoveryHelper, req.Restore.Spec.IncludedResources, req.Restore.Spec.ExcludedResources) + // Get resource includes-excludes. + resourceIncludesExcludes := collections.GetResourceIncludesExcludes( + kr.discoveryHelper, + req.Restore.Spec.IncludedResources, + req.Restore.Spec.ExcludedResources, + ) - // get namespace includes-excludes + // Get namespace includes-excludes. namespaceIncludesExcludes := collections.NewIncludesExcludes(). Includes(req.Restore.Spec.IncludedNamespaces...). Excludes(req.Restore.Spec.ExcludedNamespaces...) @@ -192,7 +197,10 @@ func (kr *kubernetesRestorer) Restore( if val := req.Restore.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" { parsed, err := time.ParseDuration(val) if err != nil { - req.Log.WithError(errors.WithStack(err)).Errorf("Unable to parse pod volume timeout annotation %s, using server value.", val) + req.Log.WithError(errors.WithStack(err)).Errorf( + "Unable to parse pod volume timeout annotation %s, using server value.", + val, + ) } else { podVolumeTimeout = parsed } @@ -352,9 +360,10 @@ type resourceClientKey struct { namespace string } -// getOrderedResources returns an ordered list of resource identifiers to restore, based on the provided resource -// priorities and backup contents. The returned list begins with all of the prioritized resources (in order), and -// appends to that an alphabetized list of all resources in the backup. +// getOrderedResources returns an ordered list of resource identifiers to restore, +// based on the provided resource priorities and backup contents. The returned list +// begins with all of the prioritized resources (in order), and appends to that +// an alphabetized list of all resources in the backup. func getOrderedResources(resourcePriorities []string, backupResources map[string]*archive.ResourceItems) []string { // alphabetize resources in the backup orderedBackupResources := make([]string, 0, len(backupResources)) @@ -363,7 +372,8 @@ func getOrderedResources(resourcePriorities []string, backupResources map[string } sort.Strings(orderedBackupResources) - // main list: everything in resource priorities, followed by what's in the backup (alphabetized) + // Main list: everything in resource priorities, followed by what's in the + // backup (alphabetized). return append(resourcePriorities, orderedBackupResources...) } @@ -380,7 +390,7 @@ func (ctx *restoreContext) execute() (Result, Result) { } defer ctx.fileSystem.RemoveAll(dir) - // need to set this for additionalItems to be restored + // Need to set this for additionalItems to be restored. ctx.restoreDir = dir backupResources, err := archive.NewParser(ctx.log, ctx.fileSystem).Parse(ctx.restoreDir) @@ -423,9 +433,21 @@ func (ctx *restoreContext) execute() (Result, Result) { lastUpdate = &val case <-ticker.C: if lastUpdate != nil { - patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`, lastUpdate.totalItems, lastUpdate.itemsRestored) - if _, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(go_context.TODO(), ctx.restore.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { - ctx.log.WithError(errors.WithStack((err))).Warn("Got error trying to update restore's status.progress") + patch := fmt.Sprintf( + `{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`, + lastUpdate.totalItems, + lastUpdate.itemsRestored, + ) + _, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch( + go_context.TODO(), + ctx.restore.Name, + types.MergePatchType, + []byte(patch), + metav1.PatchOptions{}, + ) + if err != nil { + ctx.log.WithError(errors.WithStack((err))). + Warn("Got error trying to update restore's status.progress") } lastUpdate = nil } @@ -433,85 +455,120 @@ func (ctx *restoreContext) execute() (Result, Result) { } }() - // i: iteration counter, totalItems: previously discovered items, + // totalItems: previously discovered items, i: iteration counter. totalItems, i, existingNamespaces := 0, 0, sets.NewString() for _, selectedResource := range selectedResourceCollection { totalItems += selectedResource.totalItems } + for _, selectedResource := range selectedResourceCollection { groupResource := schema.ParseGroupResource(selectedResource.resource) for namespace, selectedItems := range selectedResource.selectedItemsByNamespace { for _, selectedItem := range selectedItems { - // if we don't know whether this namespace exists yet, attempt to create + // If we don't know whether this namespace exists yet, attempt to create // it in order to ensure it exists. Try to get it from the backup tarball // (in order to get any backed-up metadata), but if we don't find it there, // create a blank one. if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) { logger := ctx.log.WithField("namespace", namespace) - ns := getNamespace(logger, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace), selectedItem.targetNamespace) - if _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil { + + ns := getNamespace( + logger, + archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace), + selectedItem.targetNamespace, + ) + _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady( + ns, + ctx.namespaceClient, + ctx.resourceTerminatingTimeout, + ) + if err != nil { errs.AddVeleroError(err) continue - } else { - // add the newly created namespace to the list of restored items - if nsCreated { - itemKey := velero.ResourceIdentifier{ - GroupResource: kuberesource.Namespaces, - Namespace: ns.Namespace, - Name: ns.Name, - } - ctx.restoredItems[itemKey] = struct{}{} - } } - // keep track of namespaces that we know exist so we don't - // have to try to create them multiple times + + // Add the newly created namespace to the list of restored items. + if nsCreated { + itemKey := velero.ResourceIdentifier{ + GroupResource: kuberesource.Namespaces, + Namespace: ns.Namespace, + Name: ns.Name, + } + ctx.restoredItems[itemKey] = struct{}{} + } + + // Keep track of namespaces that we know exist so we don't + // have to try to create them multiple times. existingNamespaces.Insert(selectedItem.targetNamespace) } + obj, err := archive.Unmarshal(ctx.fileSystem, selectedItem.path) if err != nil { - errs.Add(selectedItem.targetNamespace, fmt.Errorf("error decoding %q: %v", strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1), err)) + errs.Add( + selectedItem.targetNamespace, + fmt.Errorf( + "error decoding %q: %v", + strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1), + err, + ), + ) continue } + w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) + warnings.Merge(&w) + errs.Merge(&e) i++ - // totalItems keeps the count of items previously known - // there may be additional items restored by plugins - // we want to include the additional items by looking at restoredItems - // at the same time, we don't want previously known items counted twice - // as they are present in both restoredItems and totalItems + + // totalItems keeps the count of items previously known. There + // may be additional items restored by plugins. We want to include + // the additional items by looking at restoredItems at the same + // time, we don't want previously known items counted twice as + // they are present in both restoredItems and totalItems. actualTotalItems := len(ctx.restoredItems) + (totalItems - i) update <- progressUpdate{ totalItems: actualTotalItems, itemsRestored: len(ctx.restoredItems), } - warnings.Merge(&w) - errs.Merge(&e) } } - // if we just restored custom resource definitions (CRDs), refresh discovery - // because the restored CRDs may have created new APIs that didn't previously - // exist in the cluster, and we want to be able to resolve & restore instances - // of them in subsequent loop iterations. + // If we just restored custom resource definitions (CRDs), refresh + // discovery because the restored CRDs may have created new APIs that + // didn't previously exist in the cluster, and we want to be able to + // resolve & restore instances of them in subsequent loop iterations. if groupResource == kuberesource.CustomResourceDefinitions { if err := ctx.discoveryHelper.Refresh(); err != nil { - warnings.Add("", errors.Wrap(err, "error refreshing discovery after restoring custom resource definitions")) + warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs")) } } } - // close the progress update channel + // Close the progress update channel. quit <- struct{}{} - // do a final progress update as stopping the ticker might have left last few updates from taking place - patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`, len(ctx.restoredItems), len(ctx.restoredItems)) - if _, err := ctx.restoreClient.Restores(ctx.restore.Namespace).Patch(go_context.TODO(), ctx.restore.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil { - ctx.log.WithError(errors.WithStack((err))).Warn("Got error trying to update restore's status.progress") + // Do a final progress update as stopping the ticker might have left last few + // updates from taking place. + patch := fmt.Sprintf( + `{"status":{"progress":{"totalItems":%d,"itemsRestored":%d}}}`, + len(ctx.restoredItems), + len(ctx.restoredItems), + ) + + _, err = ctx.restoreClient.Restores(ctx.restore.Namespace).Patch( + go_context.TODO(), + ctx.restore.Name, + types.MergePatchType, + []byte(patch), + metav1.PatchOptions{}, + ) + if err != nil { + ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status.progress") } - // wait for all of the restic restore goroutines to be done, which is + // Wait for all of the restic restore goroutines to be done, which is // only possible once all of their errors have been received by the loop // below, then close the resticErrs channel so the loop terminates. go func() { @@ -522,18 +579,18 @@ func (ctx *restoreContext) execute() (Result, Result) { close(ctx.resticErrs) }() - // this loop will only terminate when the ctx.resticErrs channel is closed + // This loop will only terminate when the ctx.resticErrs channel is closed // in the above goroutine, *after* all errors from the goroutines have been // received by this loop. for err := range ctx.resticErrs { - // TODO not ideal to be adding these to Velero-level errors + // TODO: not ideal to be adding these to Velero-level errors // rather than a specific namespace, but don't have a way // to track the namespace right now. errs.Velero = append(errs.Velero, err.Error()) } ctx.log.Info("Done waiting for all restic restores to complete") - // wait for all post-restore exec hooks with same logic as restic wait above + // Wait for all post-restore exec hooks with same logic as restic wait above. go func() { ctx.log.Info("Waiting for all post-restore-exec hooks to complete") @@ -670,11 +727,12 @@ func (ctx *restoreContext) shouldRestore(name string, pvClient client.Dynamic) ( return false, nil } - // Check the namespace associated with the claimRef to see if it's deleting/terminating before proceeding + // Check the namespace associated with the claimRef to see if it's + // deleting/terminating before proceeding. ns, err := ctx.namespaceClient.Get(go_context.TODO(), namespace, metav1.GetOptions{}) if apierrors.IsNotFound(err) { pvLogger.Debugf("namespace %s for PV not found, waiting", namespace) - // namespace not found but the PV still exists, so continue to wait + // Namespace not found but the PV still exists, so continue to wait. return false, nil } if err != nil { @@ -683,7 +741,7 @@ func (ctx *restoreContext) shouldRestore(name string, pvClient client.Dynamic) ( if ns != nil && (ns.GetDeletionTimestamp() != nil || ns.Status.Phase == v1.NamespaceTerminating) { pvLogger.Debugf("namespace %s associated with PV is deleting, waiting", namespace) - // namespace is in the process of deleting, keep looping + // Namespace is in the process of deleting, keep looping. return false, nil } @@ -699,22 +757,25 @@ func (ctx *restoreContext) shouldRestore(name string, pvClient client.Dynamic) ( return shouldRestore, err } -// crdAvailable waits for a CRD to be available for use before letting the restore continue. +// crdAvailable waits for a CRD to be available for use before letting the +// restore continue. func (ctx *restoreContext) crdAvailable(name string, crdClient client.Dynamic) (bool, error) { crdLogger := ctx.log.WithField("crdName", name) var available bool - // Wait 1 minute rather than the standard resource timeout, since each CRD will transition fairly quickly + // Wait 1 minute rather than the standard resource timeout, since each CRD + // will transition fairly quickly. err := wait.PollImmediate(time.Second, time.Minute*1, func() (bool, error) { unstructuredCRD, err := crdClient.Get(name, metav1.GetOptions{}) if err != nil { return true, err } - // TODO: Due to upstream conversion issues in runtime.FromUnstructured, we use the unstructured object here. - // Once the upstream conversion functions are fixed, we should convert to the CRD types and use IsCRDReady + // TODO: Due to upstream conversion issues in runtime.FromUnstructured, + // we use the unstructured object here. Once the upstream conversion + // functions are fixed, we should convert to the CRD types and use + // IsCRDReady. available, err = kube.IsUnstructuredCRDReady(unstructuredCRD) - if err != nil { return true, err } @@ -723,8 +784,8 @@ func (ctx *restoreContext) crdAvailable(name string, crdClient client.Dynamic) ( crdLogger.Debug("CRD not yet ready for use") } - // If the CRD is not available, keep polling (false, nil) - // If the CRD is available, break the poll and return back to caller (true, nil) + // If the CRD is not available, keep polling (false, nil). + // If the CRD is available, break the poll and return to caller (true, nil). return available, nil }) @@ -745,8 +806,8 @@ func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, return client, nil } - // initialize client for this Resource. we need - // metadata from an object to do this. + // Initialize client for this resource. We need metadata from an object to + // do this. ctx.log.Infof("Getting client for %v", obj.GroupVersionKind()) resource := metav1.APIResource{ @@ -803,15 +864,15 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs } - // if the namespace scoped resource should be restored, ensure that the namespace into - // which the resource is being restored into exists. + // If the namespace scoped resource should be restored, ensure that the + // namespace into which the resource is being restored into exists. // This is the *remapped* namespace that we are ensuring exists. nsToEnsure := getNamespace(ctx.log, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", obj.GetNamespace()), namespace) if _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil { errs.AddVeleroError(err) return warnings, errs } else { - // add the newly created namespace to the list of restored items + // Add the newly created namespace to the list of restored items. if nsCreated { itemKey := velero.ResourceIdentifier{ GroupResource: kuberesource.Namespaces, @@ -832,8 +893,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - // make a copy of object retrieved from backup - // to make it available unchanged inside restore actions + // Make a copy of object retrieved from backup to make it available unchanged + //inside restore actions. itemFromBackup := obj.DeepCopy() complete, err := isCompleted(obj, groupResource) @@ -848,7 +909,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso name := obj.GetName() - // Check if we've already restored this + // Check if we've already restored this itemKey. itemKey := velero.ResourceIdentifier{ GroupResource: groupResource, Namespace: namespace, @@ -860,7 +921,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } ctx.restoredItems[itemKey] = struct{}{} - // TODO: move to restore item action if/when we add a ShouldRestore() method to the interface + // TODO: move to restore item action if/when we add a ShouldRestore() method + // to the interface. if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" { ctx.log.Infof("Not restoring pod because it's a mirror pod") return warnings, errs @@ -882,7 +944,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs } - // Check to see if the claimRef.namespace field needs to be remapped, and do so if necessary. + // Check to see if the claimRef.namespace field needs to be remapped, + // and do so if necessary. _, err = remapClaimRefNS(ctx, obj) if err != nil { errs.Add(namespace, err) @@ -899,17 +962,18 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs } } else { - // if we're renaming the PV, we're going to give it a new random name, + // If we're renaming the PV, we're going to give it a new random name, // so we can assume it doesn't already exist in the cluster and therefore // we should proceed with restoring from snapshot. shouldRestoreSnapshot = true } if shouldRestoreSnapshot { - // reset the PV's binding status so that Kubernetes can properly associate it with the restored PVC. + // Reset the PV's binding status so that Kubernetes can properly + // associate it with the restored PVC. obj = resetVolumeBindingInfo(obj) - // even if we're renaming the PV, obj still has the old name here, because the pvRestorer + // Even if we're renaming the PV, obj still has the old name here, because the pvRestorer // uses the original name to look up metadata about the snapshot. ctx.log.Infof("Restoring persistent volume from snapshot.") updatedObj, err := ctx.pvRestorer.executePVAction(obj) @@ -919,7 +983,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } obj = updatedObj - // VolumeSnapshotter has modified the PV name, we should rename the PV + // VolumeSnapshotter has modified the PV name, we should rename the PV. if oldName != obj.GetName() { shouldRenamePV = true } @@ -928,21 +992,22 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso if shouldRenamePV { var pvName string if oldName == obj.GetName() { - // pvRestorer hasn't modified the PV name, we need to rename the PV + // pvRestorer hasn't modified the PV name, we need to rename the PV. pvName, err = ctx.pvRenamer(oldName) if err != nil { errs.Add(namespace, errors.Wrapf(err, "error renaming PV")) return warnings, errs } } else { - // VolumeSnapshotter could have modified the PV name through function `SetVolumeID`, + // VolumeSnapshotter could have modified the PV name through + // function `SetVolumeID`, pvName = obj.GetName() } ctx.renamedPVs[oldName] = pvName obj.SetName(pvName) - // add the original PV name as an annotation + // Add the original PV name as an annotation. annotations := obj.GetAnnotations() if annotations == nil { annotations = map[string]string{} @@ -955,22 +1020,24 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a restic backup to be restored.") ctx.pvsToProvision.Insert(name) - // return early because we don't want to restore the PV itself, we want to dynamically re-provision it. + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. return warnings, errs case hasDeleteReclaimPolicy(obj.Object): ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") ctx.pvsToProvision.Insert(name) - // return early because we don't want to restore the PV itself, we want to dynamically re-provision it. + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. return warnings, errs default: ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") obj = resetVolumeBindingInfo(obj) - // we call the pvRestorer here to clear out the PV's claimRef.UID, so it can be re-claimed - // when its PVC is restored and gets a new UID. + // We call the pvRestorer here to clear out the PV's claimRef.UID, + // so it can be re-claimed when its PVC is restored and gets a new UID. updatedObj, err := ctx.pvRestorer.executePVAction(obj) if err != nil { errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err)) @@ -980,7 +1047,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - // clear out non-core metadata fields & status + // Clear out non-core metadata fields and status. if obj, err = resetMetadataAndStatus(obj); err != nil { errs.Add(namespace, err) return warnings, errs @@ -1084,16 +1151,16 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } - // necessary because we may have remapped the namespace - // if the namespace is blank, don't create the key + // Necessary because we may have remapped the namespace if the namespace is + // blank, don't create the key. originalNamespace := obj.GetNamespace() if namespace != "" { obj.SetNamespace(namespace) } - // label the resource with the restore's name and the restored backup's name + // Label the resource with the restore's name and the restored backup's name // for easy identification of all cluster resources created by this restore - // and which backup they came from + // and which backup they came from. addRestoreLabels(obj, ctx.restore.Name, ctx.restore.Spec.BackupName) ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name) @@ -1105,7 +1172,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso warnings.Add(namespace, err) return warnings, errs } - // Remove insubstantial metadata + // Remove insubstantial metadata. fromCluster, err = resetMetadataAndStatus(fromCluster) if err != nil { ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err) @@ -1113,8 +1180,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs } - // We know the object from the cluster won't have the backup/restore name labels, so - // copy them from the object we attempted to restore. + // We know the object from the cluster won't have the backup/restore name + // labels, so copy them from the object we attempted to restore. labels := obj.GetLabels() addRestoreLabels(fromCluster, labels[velerov1api.RestoreNameLabel], labels[velerov1api.BackupNameLabel]) @@ -1136,7 +1203,8 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } if patchBytes == nil { - // In-cluster and desired state are the same, so move on to the next item + // In-cluster and desired state are the same, so move on to + // the next item. return warnings, errs } @@ -1157,7 +1225,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs } - // Error was something other than an AlreadyExists + // Error was something other than an AlreadyExists. if restoreErr != nil { ctx.log.Errorf("error restoring %s: %+v", name, restoreErr) errs.Add(namespace, fmt.Errorf("error restoring %s: %v", resourceID, restoreErr)) @@ -1186,11 +1254,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs } -// shouldRenamePV returns a boolean indicating whether a persistent volume should be given a new name -// before being restored, or an error if this cannot be determined. A persistent volume will be -// given a new name if and only if (a) a PV with the original name already exists in-cluster, and -// (b) in the backup, the PV is claimed by a PVC in a namespace that's being remapped during the -// restore. +// shouldRenamePV returns a boolean indicating whether a persistent volume should +// be given a new name before being restored, or an error if this cannot be determined. +// A persistent volume will be given a new name if and only if (a) a PV with the +// original name already exists in-cluster, and (b) in the backup, the PV is claimed +// by a PVC in a namespace that's being remapped during the restore. func shouldRenamePV(ctx *restoreContext, obj *unstructured.Unstructured, client client.Dynamic) (bool, error) { if len(ctx.restore.Spec.NamespaceMapping) == 0 { ctx.log.Debugf("Persistent volume does not need to be renamed because restore is not remapping any namespaces") @@ -1221,19 +1289,21 @@ func shouldRenamePV(ctx *restoreContext, obj *unstructured.Unstructured, client return false, errors.Wrapf(err, "error checking if persistent volume exists in the cluster") } - // no error returned: the PV was found in-cluster, so we need to rename it + // No error returned: the PV was found in-cluster, so we need to rename it. return true, nil } -// remapClaimRefNS remaps a PersistentVolume's claimRef.Namespace based on a restore's NamespaceMappings, if necessary. -// Returns true if the namespace was remapped, false if it was not required. +// remapClaimRefNS remaps a PersistentVolume's claimRef.Namespace based on a +// restore's NamespaceMappings, if necessary. Returns true if the namespace was +// remapped, false if it was not required. func remapClaimRefNS(ctx *restoreContext, obj *unstructured.Unstructured) (bool, error) { if len(ctx.restore.Spec.NamespaceMapping) == 0 { ctx.log.Debug("Persistent volume does not need to have the claimRef.namespace remapped because restore is not remapping any namespaces") return false, nil } - // Conversion to the real type here is more readable than all the error checking involved with reading each field individually. + // Conversion to the real type here is more readable than all the error checking + // involved with reading each field individually. pv := new(v1.PersistentVolume) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, pv); err != nil { return false, errors.Wrapf(err, "error converting persistent volume to structured") @@ -1266,8 +1336,8 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr } else { ctx.resticWaitGroup.Add(1) go func() { - // Done() will only be called after all errors have been successfully sent - // on the ctx.resticErrs channel + // Done() will only be called after all errors have been successfully + // sent on the ctx.resticErrs channel defer ctx.resticWaitGroup.Done() pod := new(v1.Pod) @@ -1295,12 +1365,12 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr } } -// waitExec executes hooks in a restored pod's containers when they become ready +// waitExec executes hooks in a restored pod's containers when they become ready. func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) { ctx.hooksWaitGroup.Add(1) go func() { // Done() will only be called after all errors have been successfully sent - // on the ctx.resticErrs channel + // on the ctx.resticErrs channel. defer ctx.hooksWaitGroup.Done() pod := new(v1.Pod) @@ -1325,7 +1395,7 @@ func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) { ctx.hooksCancelFunc() for _, err := range errs { - // Errors are already logged in the HandleHooks method + // Errors are already logged in the HandleHooks method. ctx.hooksErrs <- err } } @@ -1373,24 +1443,31 @@ func hasDeleteReclaimPolicy(obj map[string]interface{}) bool { return policy == string(v1.PersistentVolumeReclaimDelete) } -// resetVolumeBindingInfo clears any necessary metadata out of a PersistentVolume or PersistentVolumeClaim that would make it ineligible to be re-bound by Velero. +// resetVolumeBindingInfo clears any necessary metadata out of a PersistentVolume +// or PersistentVolumeClaim that would make it ineligible to be re-bound by Velero. func resetVolumeBindingInfo(obj *unstructured.Unstructured) *unstructured.Unstructured { - // Clean out ClaimRef UID and resourceVersion, since this information is highly unique. + // Clean out ClaimRef UID and resourceVersion, since this information is + // highly unique. unstructured.RemoveNestedField(obj.Object, "spec", "claimRef", "uid") unstructured.RemoveNestedField(obj.Object, "spec", "claimRef", "resourceVersion") - // Clear out any annotations used by the Kubernetes PV controllers to track bindings. + // Clear out any annotations used by the Kubernetes PV controllers to track + // bindings. annotations := obj.GetAnnotations() - // Upon restore, this new PV will look like a statically provisioned, manually-bound volume rather than one bound by the controller, so remove the annotation that signals that a controller bound it. + // Upon restore, this new PV will look like a statically provisioned, manually- + // bound volume rather than one bound by the controller, so remove the annotation + // that signals that a controller bound it. delete(annotations, KubeAnnBindCompleted) - // Remove the annotation that signals that the PV is already bound; we want the PV(C) controller to take the two objects and bind them again. + // Remove the annotation that signals that the PV is already bound; we want + // the PV(C) controller to take the two objects and bind them again. delete(annotations, KubeAnnBoundByController) - // Remove the provisioned-by annotation which signals that the persistent volume was dynamically provisioned; it is now statically provisioned. + // Remove the provisioned-by annotation which signals that the persistent + // volume was dynamically provisioned; it is now statically provisioned. delete(annotations, KubeAnnDynamicallyProvisioned) - // GetAnnotations returns a copy, so we have to set them again + // GetAnnotations returns a copy, so we have to set them again. obj.SetAnnotations(annotations) return obj @@ -1420,8 +1497,8 @@ func resetMetadataAndStatus(obj *unstructured.Unstructured) (*unstructured.Unstr return obj, nil } -// addRestoreLabels labels the provided object with the restore name and -// the restored backup's name. +// addRestoreLabels labels the provided object with the restore name and the +// restored backup's name. func addRestoreLabels(obj metav1.Object, restoreName, backupName string) { labels := obj.GetLabels() @@ -1435,8 +1512,9 @@ func addRestoreLabels(obj metav1.Object, restoreName, backupName string) { obj.SetLabels(labels) } -// isCompleted returns whether or not an object is considered completed. -// Used to identify whether or not an object should be restored. Only Jobs or Pods are considered +// isCompleted returns whether or not an object is considered completed. Used to +// identify whether or not an object should be restored. Only Jobs or Pods are +// considered. func isCompleted(obj *unstructured.Unstructured, groupResource schema.GroupResource) (bool, error) { switch groupResource { case kuberesource.Pods: @@ -1457,42 +1535,48 @@ func isCompleted(obj *unstructured.Unstructured, groupResource schema.GroupResou return true, nil } } - // Assume any other resource isn't complete and can be restored + // Assume any other resource isn't complete and can be restored. return false, nil } -// restoreableResource represents map of individual items of -// each resource identifier grouped by their original namespaces +// restoreableResource represents map of individual items of each resource +// identifier grouped by their original namespaces. type restoreableResource struct { resource string selectedItemsByNamespace map[string][]restoreableItem totalItems int } -// restoreableItem represents an item by its target namespace -// contains enough information required to restore the item +// restoreableItem represents an item by its target namespace contains enough +// information required to restore the item. type restoreableItem struct { path string targetNamespace string name string } -// getOrderedResourceCollection iterates over list of ordered resource idenitifiers, applies resource include/exclude criteria, -// and Kubernetes selectors to make a list of resources to be actually restored preserving the original order +// getOrderedResourceCollection iterates over list of ordered resource +// identifiers, applies resource include/exclude criteria, and Kubernetes +// selectors to make a list of resources to be actually restored preserving the +// original order. func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[string]*archive.ResourceItems) ([]restoreableResource, Result, Result) { var warnings, errs Result processedResources := sets.NewString() restoreResourceCollection := make([]restoreableResource, 0) - // Iterate through an ordered list of resources to restore, checking each one to see if it should be restored. - // Note that resources *may* be in this list twice, i.e. once due to being a prioritized resource, and once due - // to being in the backup tarball. We can't de-dupe this upfront, because it's possible that items in the prioritized - // resources list may not be fully resolved group-resource strings (e.g. may be specified as "po" instead of "pods"), - // and we don't want to fully resolve them via discovery until we reach them in the loop, because it is possible - // that the resource/API itself is being restored via a custom resource definition, meaning it's not available via - // discovery prior to beginning the restore. + // Iterate through an ordered list of resources to restore, checking each + // one to see if it should be restored. Note that resources *may* be in this + // list twice, i.e. once due to being a prioritized resource, and once due + // to being in the backup tarball. We can't de-dupe this upfront, because + // it's possible that items in the prioritized resources list may not be + // fully resolved group-resource strings (e.g. may be specified as "po" + // instead of "pods"), and we don't want to fully resolve them via discovery + // until we reach them in the loop, because it is possible that the + // resource/API itself is being restored via a custom resource definition, + // meaning it's not available via discovery prior to beginning the restore. // - // Since we keep track of the fully-resolved group-resources that we *have* restored, we won't try to restore a - // resource twice even if it's in the ordered list twice. + // Since we keep track of the fully-resolved group-resources that we *have* + // restored, we won't try to restore a resource twice even if it's in the + // ordered list twice. for _, resource := range getOrderedResources(ctx.resourcePriorities, backupResources) { // try to resolve the resource via discovery to a complete group/version/resource gvr, _, err := ctx.discoveryHelper.ResourceFor(schema.ParseGroupResource(resource).WithVersion("")) @@ -1502,35 +1586,37 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri } groupResource := gvr.GroupResource() - // check if we've already restored this resource (this would happen if the resource - // we're currently looking at was already restored because it was a prioritized - // resource, and now we're looking at it as part of the backup contents). + // Check if we've already restored this resource (this would happen if + // the resource we're currently looking at was already restored because + // it was a prioritized resource, and now we're looking at it as part of + // the backup contents). if processedResources.Has(groupResource.String()) { ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's already been processed") continue } - // check if the resource should be restored according to the resource includes/excludes + // Check if the resource should be restored according to the resource + // includes/excludes. if !ctx.resourceIncludesExcludes.ShouldInclude(groupResource.String()) { ctx.log.WithField("resource", groupResource.String()).Infof("Skipping restore of resource because the restore spec excludes it") continue } - // we don't want to explicitly restore namespace API objs because we'll handle + // We don't want to explicitly restore namespace API objs because we'll handle // them as a special case prior to restoring anything into them if groupResource == kuberesource.Namespaces { continue } - // check if the resource is present in the backup + // Check if the resource is present in the backup resourceList := backupResources[groupResource.String()] if resourceList == nil { ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's not present in the backup tarball") continue } - // iterate through each namespace that contains instances of the resource and - // append to the list of to-be restored resources + // Iterate through each namespace that contains instances of the + // resource and append to the list of to-be restored resources. for namespace, items := range resourceList.ItemsByNamespace { if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) { ctx.log.Infof("Skipping namespace %s", namespace) @@ -1555,10 +1641,10 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri } res, w, e := ctx.getSelectedRestoreableItems(groupResource.String(), targetNamespace, namespace, items) - restoreResourceCollection = append(restoreResourceCollection, res) - warnings.Merge(&w) errs.Merge(&e) + + restoreResourceCollection = append(restoreResourceCollection, res) } // record that we've restored the resource @@ -1567,14 +1653,18 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri return restoreResourceCollection, warnings, errs } -// getSelectedRestoreableItems applies Kubernetes selectors on individual items of each resource type to create -// a list of items which will be actually restored +// getSelectedRestoreableItems applies Kubernetes selectors on individual items +// of each resource type to create a list of items which will be actually +// restored. func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace, originalNamespace string, items []string) (restoreableResource, Result, Result) { - var res restoreableResource warnings, errs := Result{}, Result{} - res.resource = resource - if res.selectedItemsByNamespace == nil { - res.selectedItemsByNamespace = make(map[string][]restoreableItem) + + restorable := restoreableResource{ + resource: resource, + } + + if restorable.selectedItemsByNamespace == nil { + restorable.selectedItemsByNamespace = make(map[string][]restoreableItem) } if targetNamespace != "" { @@ -1583,12 +1673,33 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace ctx.log.Infof("Resource '%s' will be restored at cluster scope", resource) } + // If the APIGroupVersionsFeatureFlag is enabled, the item path will be + // updated to include the API group version that was chosen for restore. For + // example, for "horizontalpodautoscalers.autoscaling", if v2beta1 is chosen + // to be restored, then "horizontalpodautoscalers.autoscaling/v2beta1" will + // be part of item path. Different versions would only have been stored + // if the APIGroupVersionsFeatureFlag was enabled during backup. The + // chosenGrpVersToRestore map would only be populated if + // APIGroupVersionsFeatureFlag was enabled for restore and the minimum + // required backup format version has been met. + cgv, ok := ctx.chosenGrpVersToRestore[resource] + if ok { + resource = filepath.Join(resource, cgv.Dir) + } + for _, item := range items { itemPath := archive.GetItemFilePath(ctx.restoreDir, resource, originalNamespace, item) obj, err := archive.Unmarshal(ctx.fileSystem, itemPath) if err != nil { - errs.Add(targetNamespace, fmt.Errorf("error decoding %q: %v", strings.Replace(itemPath, ctx.restoreDir+"/", "", -1), err)) + errs.Add( + targetNamespace, + fmt.Errorf( + "error decoding %q: %v", + strings.Replace(itemPath, ctx.restoreDir+"/", "", -1), + err, + ), + ) continue } @@ -1601,8 +1712,9 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace name: item, targetNamespace: targetNamespace, } - res.selectedItemsByNamespace[originalNamespace] = append(res.selectedItemsByNamespace[originalNamespace], selectedItem) - res.totalItems++ + restorable.selectedItemsByNamespace[originalNamespace] = + append(restorable.selectedItemsByNamespace[originalNamespace], selectedItem) + restorable.totalItems++ } - return res, warnings, errs + return restorable, warnings, errs }