update after review

Signed-off-by: lou <alex1988@outlook.com>
This commit is contained in:
lou
2023-11-27 17:39:37 +08:00
parent ebb21303ab
commit 179faf3e33

View File

@@ -502,7 +502,7 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
}()
// totalItems: previously discovered items, i: iteration counter.
totalItems, processedItems, createdItems, existingNamespaces := 0, 0, 0, sets.NewString()
totalItems, processedItems, existingNamespaces := 0, 0, sets.NewString()
// First restore CRDs. This is needed so that they are available in the cluster
// when getOrderedResourceCollection is called again on the whole backup and
@@ -525,22 +525,29 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
var w, e results.Result
// Restore this resource, the update channel is set to nil, to avoid misleading value of "totalItems"
// more details see #5990
processedItems, createdItems, w, e = ctx.processSelectedResource(
processedItems, w, e = ctx.processSelectedResource(
selectedResource,
totalItems,
processedItems,
createdItems,
existingNamespaces,
nil,
)
warnings.Merge(&w)
errs.Merge(&e)
}
var createdOrUpdatedCRDs bool
for _, restoredItem := range ctx.restoredItems {
if restoredItem.action == itemRestoreResultCreated || restoredItem.action == itemRestoreResultUpdated {
createdOrUpdatedCRDs = true
break
}
}
// If we just restored custom resource definitions (CRDs), refresh
// discovery because the restored CRDs may have created new APIs that
// discovery because the restored CRDs may have created or updated new APIs that
// didn't previously exist in the cluster, and we want to be able to
// resolve & restore instances of them in subsequent loop iterations.
if createdItems > 0 {
if createdOrUpdatedCRDs {
if err := ctx.discoveryHelper.Refresh(); err != nil {
warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs"))
}
@@ -586,7 +593,6 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
// reset processedItems and totalItems before processing full resource list
processedItems = 0
totalItems = 0
createdItems = 0
for _, selectedResource := range selectedResourceCollection {
totalItems += selectedResource.totalItems
}
@@ -594,11 +600,10 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
for _, selectedResource := range selectedResourceCollection {
var w, e results.Result
// Restore this resource
processedItems, createdItems, w, e = ctx.processSelectedResource(
processedItems, w, e = ctx.processSelectedResource(
selectedResource,
totalItems,
processedItems,
createdItems,
existingNamespaces,
update,
)
@@ -682,10 +687,9 @@ func (ctx *restoreContext) processSelectedResource(
selectedResource restoreableResource,
totalItems int,
processedItems int,
createdItems int,
existingNamespaces sets.String,
update chan progressUpdate,
) (int, int, results.Result, results.Result) {
) (int, results.Result, results.Result) {
warnings, errs := results.Result{}, results.Result{}
groupResource := schema.ParseGroupResource(selectedResource.resource)
@@ -741,15 +745,11 @@ func (ctx *restoreContext) processSelectedResource(
continue
}
w, e, _, created := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace)
w, e, _ := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace)
warnings.Merge(&w)
errs.Merge(&e)
processedItems++
if created {
createdItems++
}
// totalItems keeps the count of items previously known. There
// may be additional items restored by plugins. We want to include
// the additional items by looking at restoredItems at the same
@@ -771,7 +771,7 @@ func (ctx *restoreContext) processSelectedResource(
}
}
return processedItems, createdItems, warnings, errs
return processedItems, warnings, errs
}
// getNamespace returns a namespace API object that we should attempt to
@@ -1091,9 +1091,10 @@ func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj *
return u, nil
}
// itemExists bool is used to determine whether to include this item in the "wait for additional items" list
// itemCreated indicates whether the item was created by this restore
func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (warnings, errs results.Result, itemExists, itemCreated bool) {
func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (results.Result, results.Result, bool) {
warnings, errs := results.Result{}, results.Result{}
// itemExists bool is used to determine whether to include this item in the "wait for additional items" list
itemExists := false
resourceID := getResourceID(groupResource, namespace, obj.GetName())
// Check if group/resource should be restored. We need to do this here since
@@ -1105,7 +1106,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Info("Not restoring item because resource is excluded")
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// Check if namespace/cluster-scoped resource should be restored. We need
@@ -1121,7 +1122,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Info("Not restoring item because namespace is excluded")
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// If the namespace scoped resource should be restored, ensure that the
@@ -1131,7 +1132,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout)
if err != nil {
errs.AddVeleroError(err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// Add the newly created namespace to the list of restored items.
if nsCreated {
@@ -1149,7 +1150,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Info("Not restoring item because it's cluster-scoped")
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
}
@@ -1160,11 +1161,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
complete, err := isCompleted(obj, groupResource)
if err != nil {
errs.Add(namespace, fmt.Errorf("error checking completion of %q: %v", resourceID, err))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
if complete {
ctx.log.Infof("%s is complete - skipping", kube.NamespaceAndName(obj))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
name := obj.GetName()
@@ -1178,7 +1179,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if prevRestoredItemStatus, exists := ctx.restoredItems[itemKey]; exists {
ctx.log.Infof("Skipping %s because it's already been restored.", resourceID)
itemExists = prevRestoredItemStatus.itemExists
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
ctx.restoredItems[itemKey] = restoredItemStatus{itemExists: itemExists}
defer func() {
@@ -1202,13 +1203,13 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// to the interface.
if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" {
ctx.log.Infof("Not restoring pod because it's a mirror pod")
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
resourceClient, err := ctx.getResourceClient(groupResource, obj, namespace)
if err != nil {
errs.AddVeleroError(fmt.Errorf("error getting resource client for namespace %q, resource %q: %v", namespace, &groupResource, err))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
if groupResource == kuberesource.PersistentVolumes {
@@ -1218,7 +1219,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
shouldRenamePV, err := shouldRenamePV(ctx, obj, resourceClient)
if err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// Check to see if the claimRef.namespace field needs to be remapped,
@@ -1226,7 +1227,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
_, err = remapClaimRefNS(ctx, obj)
if err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
var shouldRestoreSnapshot bool
@@ -1236,7 +1237,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
shouldRestoreSnapshot, err = ctx.shouldRestore(name, resourceClient)
if err != nil {
errs.Add(namespace, errors.Wrapf(err, "error waiting on in-cluster persistentvolume %s", name))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
} else {
// If we're renaming the PV, we're going to give it a new random name,
@@ -1256,7 +1257,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
if err != nil {
errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
obj = updatedObj
@@ -1273,7 +1274,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
pvName, err = ctx.pvRenamer(oldName)
if err != nil {
errs.Add(namespace, errors.Wrapf(err, "error renaming PV"))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
} else {
// VolumeSnapshotter could have modified the PV name through
@@ -1300,7 +1301,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// Return early because we don't want to restore the PV itself, we
// want to dynamically re-provision it.
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
case hasDeleteReclaimPolicy(obj.Object):
ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.")
@@ -1308,7 +1309,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// Return early because we don't want to restore the PV itself, we
// want to dynamically re-provision it.
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
default:
ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.")
@@ -1317,7 +1318,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
_, err = remapClaimRefNS(ctx, obj)
if err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
obj = resetVolumeBindingInfo(obj)
// We call the pvRestorer here to clear out the PV's claimRef.UID,
@@ -1325,7 +1326,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
updatedObj, err := ctx.pvRestorer.executePVAction(obj)
if err != nil {
errs.Add(namespace, fmt.Errorf("error executing PVAction for %s: %v", resourceID, err))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
obj = updatedObj
}
@@ -1335,7 +1336,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// Clear out non-core metadata fields and status.
if obj, err = resetMetadataAndStatus(obj); err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
ctx.log.Infof("restore status includes excludes: %+v", ctx.resourceStatusIncludesExcludes)
@@ -1360,7 +1361,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
})
if err != nil {
errs.Add(namespace, fmt.Errorf("error preparing %s: %v", resourceID, err))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// If async plugin started async operation, add it to the ItemOperations list
@@ -1389,12 +1390,12 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
if executeOutput.SkipRestore {
ctx.log.Infof("Skipping restore of %s: %v because a registered plugin discarded it", obj.GroupVersionKind().Kind, name)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
unstructuredObj, ok := executeOutput.UpdatedItem.(*unstructured.Unstructured)
if !ok {
errs.Add(namespace, fmt.Errorf("%s: unexpected type %T", resourceID, executeOutput.UpdatedItem))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
obj = unstructuredObj
@@ -1427,7 +1428,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
}
w, e, additionalItemExists, _ := ctx.restoreItem(additionalObj, additionalItem.GroupResource, additionalItemNamespace)
w, e, additionalItemExists := ctx.restoreItem(additionalObj, additionalItem.GroupResource, additionalItemNamespace)
if additionalItemExists {
filteredAdditionalItems = append(filteredAdditionalItems, additionalItem)
}
@@ -1456,7 +1457,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
pvc := new(v1.PersistentVolumeClaim)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pvc); err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
if pvc.Spec.VolumeName != "" {
@@ -1475,7 +1476,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
ctx.log.Infof("Updating persistent volume claim %s/%s to reference renamed persistent volume (%s -> %s)", namespace, name, pvc.Spec.VolumeName, newName)
if err := unstructured.SetNestedField(obj.Object, newName, "spec", "volumeName"); err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
}
}
@@ -1506,7 +1507,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
resourceClient, err = ctx.getResourceClient(newGR, obj, obj.GetNamespace())
if err != nil {
errs.AddVeleroError(fmt.Errorf("error getting updated resource client for namespace %q, resource %q: %v", namespace, &groupResource, err))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name)
@@ -1535,7 +1536,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
isAlreadyExistsError, err := isAlreadyExistsError(ctx, obj, restoreErr, resourceClient)
if err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
if restoreErr != nil {
@@ -1550,7 +1551,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if err != nil && isAlreadyExistsError {
ctx.log.Warnf("Unable to retrieve in-cluster version of %s: %v, object won't be restored by velero or have restore labels, and existing resource policy is not applied", kube.NamespaceAndName(obj), err)
warnings.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
}
@@ -1564,7 +1565,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if err != nil {
ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
warnings.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// We know the object from the cluster won't have the backup/restore name
@@ -1580,20 +1581,20 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if err != nil {
ctx.log.Infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
warnings.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
patchBytes, err := generatePatch(fromCluster, desired)
if err != nil {
ctx.log.Infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
warnings.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
if patchBytes == nil {
// In-cluster and desired state are the same, so move on to
// the next item.
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
_, err = resourceClient.Patch(name, patchBytes)
@@ -1642,7 +1643,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
warnings.Add(namespace, e)
}
}
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
//update backup/restore labels on the unchanged resources if existingResourcePolicy is set as update
@@ -1658,24 +1659,22 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
ctx.log.Infof("Restore of %s, %v skipped: it already exists in the cluster and is the same as the backed up version", obj.GroupVersionKind().Kind, name)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// Error was something other than an AlreadyExists.
if restoreErr != nil {
ctx.log.Errorf("error restoring %s: %+v", name, restoreErr)
errs.Add(namespace, fmt.Errorf("error restoring %s: %v", resourceID, restoreErr))
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
itemCreated = true
shouldRestoreStatus := ctx.resourceStatusIncludesExcludes != nil && ctx.resourceStatusIncludesExcludes.ShouldInclude(groupResource.String())
if shouldRestoreStatus && statusFieldErr != nil {
err := fmt.Errorf("could not get status to be restored %s: %v", kube.NamespaceAndName(obj), statusFieldErr)
ctx.log.Errorf(err.Error())
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
ctx.log.Debugf("status field for %s: exists: %v, should restore: %v", groupResource, statusFieldExists, shouldRestoreStatus)
// if it should restore status, run a UpdateStatus
@@ -1683,7 +1682,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if err := unstructured.SetNestedField(obj.Object, objStatus, "status"); err != nil {
ctx.log.Errorf("could not set status field %s: %v", kube.NamespaceAndName(obj), err)
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
obj.SetResourceVersion(createdObj.GetResourceVersion())
updated, err := resourceClient.UpdateStatus(obj, metav1.UpdateOptions{})
@@ -1702,14 +1701,14 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
if err != nil {
ctx.log.Errorf("error generating patch for managed fields %s: %v", kube.NamespaceAndName(obj), err)
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
if patchBytes != nil {
if _, err = resourceClient.Patch(name, patchBytes); err != nil {
ctx.log.Errorf("error patch for managed fields %s: %v", kube.NamespaceAndName(obj), err)
if !apierrors.IsNotFound(err) {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
} else {
ctx.log.Infof("the managed fields for %s is patched", kube.NamespaceAndName(obj))
@@ -1720,7 +1719,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
pod := new(v1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod); err != nil {
errs.Add(namespace, err)
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
// Do not create podvolumerestore when current restore excludes pv/pvc
@@ -1746,7 +1745,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
}
return warnings, errs, itemExists, itemCreated
return warnings, errs, itemExists
}
func isAlreadyExistsError(ctx *restoreContext, obj *unstructured.Unstructured, err error, client client.Dynamic) (bool, error) {