From 583ef4258a57ac82cf4283a73dea2c2e73a3d636 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Tue, 4 Feb 2020 14:33:12 -0700 Subject: [PATCH] refactor restore priorities code to use single loop and lazy discovery Signed-off-by: Steve Kriss --- changelogs/unreleased/2248-skriss | 1 + pkg/restore/restore.go | 211 ++++++++++-------------------- pkg/restore/restore_test.go | 129 +++++++----------- 3 files changed, 113 insertions(+), 228 deletions(-) create mode 100644 changelogs/unreleased/2248-skriss diff --git a/changelogs/unreleased/2248-skriss b/changelogs/unreleased/2248-skriss new file mode 100644 index 000000000..f43c478f6 --- /dev/null +++ b/changelogs/unreleased/2248-skriss @@ -0,0 +1 @@ +refactor restore code to lazily resolve resources via discovery and eliminate second restore loop for instances of restored CRDs diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index aced1d887..6890c9924 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -98,64 +98,6 @@ type kubernetesRestorer struct { logger logrus.FieldLogger } -// prioritizeResources returns an ordered, fully-resolved list of resources to restore based on -// the provided discovery helper, resource priorities, and included/excluded resources. -func prioritizeResources(helper discovery.Helper, priorities []string, includedResources *collections.IncludesExcludes, logger logrus.FieldLogger) ([]schema.GroupResource, error) { - var ret []schema.GroupResource - - // set keeps track of resolved GroupResource names - set := sets.NewString() - - // start by resolving priorities into GroupResources and adding them to ret - for _, r := range priorities { - gvr, _, err := helper.ResourceFor(schema.ParseGroupResource(r).WithVersion("")) - if err != nil { - return nil, err - } - gr := gvr.GroupResource() - - if !includedResources.ShouldInclude(gr.String()) { - logger.WithField("groupResource", gr).Info("Not including resource") - continue - } - ret = append(ret, gr) - set.Insert(gr.String()) - } - - // go through everything we got from discovery and add anything not in "set" to byName - var byName []schema.GroupResource - for _, resourceGroup := range helper.Resources() { - // will be something like storage.k8s.io/v1 - groupVersion, err := schema.ParseGroupVersion(resourceGroup.GroupVersion) - if err != nil { - return nil, err - } - - for _, resource := range resourceGroup.APIResources { - gr := groupVersion.WithResource(resource.Name).GroupResource() - - if !includedResources.ShouldInclude(gr.String()) { - logger.WithField("groupResource", gr.String()).Info("Not including resource") - continue - } - - if !set.Has(gr.String()) { - byName = append(byName, gr) - } - } - } - - // sort byName by name - sort.Slice(byName, func(i, j int) bool { - return byName[i].String() < byName[j].String() - }) - - // combine prioritized with by-name - ret = append(ret, byName...) - - return ret, nil -} - // NewKubernetesRestorer creates a new kubernetesRestorer. func NewKubernetesRestorer( discoveryHelper discovery.Helper, @@ -213,10 +155,6 @@ func (kr *kubernetesRestorer) Restore( // get resource includes-excludes resourceIncludesExcludes := getResourceIncludesExcludes(kr.discoveryHelper, req.Restore.Spec.IncludedResources, req.Restore.Spec.ExcludedResources) - prioritizedResources, err := prioritizeResources(kr.discoveryHelper, kr.resourcePriorities, resourceIncludesExcludes, req.Log) - if err != nil { - return Result{}, Result{Velero: []string{err.Error()}} - } // get namespace includes-excludes namespaceIncludesExcludes := collections.NewIncludesExcludes(). @@ -265,7 +203,6 @@ func (kr *kubernetesRestorer) Restore( restore: req.Restore, resourceIncludesExcludes: resourceIncludesExcludes, namespaceIncludesExcludes: namespaceIncludesExcludes, - prioritizedResources: prioritizedResources, selector: selector, log: req.Log, dynamicFactory: kr.dynamicFactory, @@ -359,7 +296,6 @@ type context struct { restoreDir string resourceIncludesExcludes *collections.IncludesExcludes namespaceIncludesExcludes *collections.IncludesExcludes - prioritizedResources []schema.GroupResource selector labels.Selector log logrus.FieldLogger dynamicFactory client.DynamicFactory @@ -388,6 +324,21 @@ type resourceClientKey struct { namespace string } +// getOrderedResources returns an ordered list of resource identifiers to restore, based on the provided resource +// priorities and backup contents. The returned list begins with all of the prioritized resources (in order), and +// appends to that an alphabetized list of all resources in the backup. +func getOrderedResources(resourcePriorities []string, backupResources map[string]*archive.ResourceItems) []string { + // alphabetize resources in the backup + orderedBackupResources := make([]string, 0, len(backupResources)) + for resource := range backupResources { + orderedBackupResources = append(orderedBackupResources, resource) + } + sort.Strings(orderedBackupResources) + + // master list: everything in resource priorities, followed by what's in the backup (alphabetized) + return append(resourcePriorities, orderedBackupResources...) +} + func (ctx *context) execute() (Result, Result) { warnings, errs := Result{}, Result{} @@ -404,26 +355,65 @@ func (ctx *context) execute() (Result, Result) { // need to set this for additionalItems to be restored ctx.restoreDir = dir + var ( + existingNamespaces = sets.NewString() + processedResources = sets.NewString() + ) + backupResources, err := archive.NewParser(ctx.log, ctx.fileSystem).Parse(ctx.restoreDir) if err != nil { errs.AddVeleroError(errors.Wrap(err, "error parsing backup contents")) return warnings, errs } - existingNamespaces := sets.NewString() + // Iterate through an ordered list of resources to restore, checking each one to see if it should be restored. + // Note that resources *may* be in this list twice, i.e. once due to being a prioritized resource, and once due + // to being in the backup tarball. We can't de-dupe this upfront, because it's possible that items in the prioritized + // resources list may not be fully resolved group-resource strings (e.g. may be specfied as "po" instead of "pods"), + // and we don't want to fully resolve them via discovery until we reach them in the loop, because it is possible + // that the resource/API itself is being restored via a custom resource definition, meaning it's not available via + // discovery prior to beginning the restore. + // + // Since we keep track of the fully-resolved group-resources that we *have* restored, we won't try to restore a + // resource twice even if it's in the ordered list twice. + for _, resource := range getOrderedResources(ctx.resourcePriorities, backupResources) { + // try to resolve the resource via discovery to a complete group/version/resource + gvr, _, err := ctx.discoveryHelper.ResourceFor(schema.ParseGroupResource(resource).WithVersion("")) + if err != nil { + ctx.log.WithField("resource", resource).Infof("Skipping restore of resource because it cannot be resolved via discovery") + continue + } + groupResource := gvr.GroupResource() + + // check if we've already restored this resource (this would happen if the resource + // we're currently looking at was already restored because it was a prioritized + // resource, and now we're looking at it as part of the backup contents). + if processedResources.Has(groupResource.String()) { + ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's already been processed") + continue + } + + // check if the resource should be restored according to the resource includes/excludes + if !ctx.resourceIncludesExcludes.ShouldInclude(groupResource.String()) { + ctx.log.WithField("resource", groupResource.String()).Infof("Skipping restore of resource because the restore spec excludes it") + continue + } - for _, resource := range ctx.prioritizedResources { // we don't want to explicitly restore namespace API objs because we'll handle // them as a special case prior to restoring anything into them - if resource == kuberesource.Namespaces { + if groupResource == kuberesource.Namespaces { continue } - resourceList := backupResources[resource.String()] + // check if the resource is present in the backup + resourceList := backupResources[groupResource.String()] if resourceList == nil { + ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's not present in the backup tarball") continue } + // iterate through each namespace that contains instances of the resource and + // restore them for namespace, items := range resourceList.ItemsByNamespace { if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) { ctx.log.Infof("Skipping namespace %s", namespace) @@ -454,88 +444,23 @@ func (ctx *context) execute() (Result, Result) { existingNamespaces.Insert(targetNamespace) } - w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items) + w, e := ctx.restoreResource(groupResource.String(), targetNamespace, namespace, items) warnings.Merge(&w) errs.Merge(&e) } - } - // TODO: Re-order this logic so that CRs can be prioritized in the main loop, rather than after. + // record that we've restored the resource + processedResources.Insert(groupResource.String()) - // Refresh and resolve based on CRDs added to the API server from the above restore loop. - // This is because CRDs have been added to the API groups but until we refresh, Velero doesn't know about the - // newly-added API groups in order to create the CRs from them. - if err := ctx.discoveryHelper.Refresh(); err != nil { - // Don't break on error here, since newResources will be the same as the original prioritizedResources, - // and thus addedResources will end up being empty and we'll restore nothing. - // Since we're continuing the restore, add a warning, not an error. - warnings.AddVeleroError(errors.Wrap(err, "error refreshing discovery API")) - } - newResources, err := prioritizeResources(ctx.discoveryHelper, ctx.resourcePriorities, ctx.resourceIncludesExcludes, ctx.log) - if err != nil { - // If there was an error, then newResources will be nil, so we can continue on the restore. - // addedResources will end up being nil, but we should still report this failure. - warnings.AddVeleroError(errors.Wrap(err, "error sorting resources")) - } - - // Filter the resources to only those added since our first restore pass. - addedResources := make([]schema.GroupResource, 0) - for _, r := range newResources { - var found bool - for _, p := range ctx.prioritizedResources { - if r == p { - found = true - break + // if we just restored custom resource definitions (CRDs), refresh discovery + // because the restored CRDs may have created new APIs that didn't previously + // exist in the cluster, and we want to be able to resolve & restore instances + // of them in subsequent loop iterations. + if groupResource == kuberesource.CustomResourceDefinitions { + if err := ctx.discoveryHelper.Refresh(); err != nil { + warnings.Add("", errors.Wrap(err, "error refreshing discovery after restoring custom resource definitions")) } } - // Resource hasn't already been processed, so queue it for the next loop. - if !found { - ctx.log.Debugf("Discovered new resource %s", r) - addedResources = append(addedResources, r) - } - } - - // Use the same restore logic as above, but for newly available API groups (CRDs) - for _, resource := range addedResources { - resourceList := backupResources[resource.String()] - if resourceList == nil { - continue - } - - for namespace, items := range resourceList.ItemsByNamespace { - if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) { - ctx.log.Infof("Skipping namespace %s", namespace) - continue - } - - // get target namespace to restore into, if different - // from source namespace - targetNamespace := namespace - if target, ok := ctx.restore.Spec.NamespaceMapping[namespace]; ok { - targetNamespace = target - } - - // if we don't know whether this namespace exists yet, attempt to create - // it in order to ensure it exists. Try to get it from the backup tarball - // (in order to get any backed-up metadata), but if we don't find it there, - // create a blank one. - if namespace != "" && !existingNamespaces.Has(targetNamespace) { - logger := ctx.log.WithField("namespace", namespace) - ns := getNamespace(logger, getItemFilePath(ctx.restoreDir, "namespaces", "", namespace), targetNamespace) - if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil { - errs.AddVeleroError(err) - continue - } - - // keep track of namespaces that we know exist so we don't - // have to try to create them multiple times - existingNamespaces.Insert(targetNamespace) - } - - w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items) - warnings.Merge(&w) - errs.Merge(&e) - } } // wait for all of the restic restore goroutines to be done, which is diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index 154365f32..57ee63b31 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -39,12 +39,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" - discoveryfake "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic" - kubefake "k8s.io/client-go/kubernetes/fake" kubetesting "k8s.io/client-go/testing" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/archive" "github.com/vmware-tanzu/velero/pkg/builder" "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/discovery" @@ -55,7 +54,6 @@ import ( resticmocks "github.com/vmware-tanzu/velero/pkg/restic/mocks" "github.com/vmware-tanzu/velero/pkg/test" testutil "github.com/vmware-tanzu/velero/pkg/test" - "github.com/vmware-tanzu/velero/pkg/util/collections" "github.com/vmware-tanzu/velero/pkg/util/encode" kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/volume" @@ -2485,88 +2483,6 @@ func TestRestoreWithRestic(t *testing.T) { } } -func TestPrioritizeResources(t *testing.T) { - tests := []struct { - name string - apiResources map[string][]string - priorities []string - includes []string - excludes []string - expected []string - }{ - { - name: "priorities & ordering are correctly applied", - apiResources: map[string][]string{ - "v1": {"aaa", "bbb", "configmaps", "ddd", "namespaces", "ooo", "pods", "sss"}, - }, - priorities: []string{"namespaces", "configmaps", "pods"}, - includes: []string{"*"}, - expected: []string{"namespaces", "configmaps", "pods", "aaa", "bbb", "ddd", "ooo", "sss"}, - }, - { - name: "includes are correctly applied", - apiResources: map[string][]string{ - "v1": {"aaa", "bbb", "configmaps", "ddd", "namespaces", "ooo", "pods", "sss"}, - }, - priorities: []string{"namespaces", "configmaps", "pods"}, - includes: []string{"namespaces", "aaa", "sss"}, - expected: []string{"namespaces", "aaa", "sss"}, - }, - { - name: "excludes are correctly applied", - apiResources: map[string][]string{ - "v1": {"aaa", "bbb", "configmaps", "ddd", "namespaces", "ooo", "pods", "sss"}, - }, - priorities: []string{"namespaces", "configmaps", "pods"}, - includes: []string{"*"}, - excludes: []string{"ooo", "pods"}, - expected: []string{"namespaces", "configmaps", "aaa", "bbb", "ddd", "sss"}, - }, - } - - logger := testutil.NewLogger() - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - discoveryClient := &test.DiscoveryClient{ - FakeDiscovery: kubefake.NewSimpleClientset().Discovery().(*discoveryfake.FakeDiscovery), - } - - helper, err := discovery.NewHelper(discoveryClient, logger) - require.NoError(t, err) - - // add all the test case's API resources to the discovery client - for gvString, resources := range tc.apiResources { - gv, err := schema.ParseGroupVersion(gvString) - require.NoError(t, err) - - for _, resource := range resources { - discoveryClient.WithAPIResource(&test.APIResource{ - Group: gv.Group, - Version: gv.Version, - Name: resource, - }) - } - } - - require.NoError(t, helper.Refresh()) - - includesExcludes := collections.NewIncludesExcludes().Includes(tc.includes...).Excludes(tc.excludes...) - - result, err := prioritizeResources(helper, tc.priorities, includesExcludes, logger) - require.NoError(t, err) - - require.Equal(t, len(tc.expected), len(result)) - - for i := range result { - if e, a := tc.expected[i], result[i].Resource; e != a { - t.Errorf("index %d, expected %s, got %s", i, e, a) - } - } - }) - } -} - func TestResetMetadataAndStatus(t *testing.T) { tests := []struct { name string @@ -2681,6 +2597,49 @@ func TestGetItemFilePath(t *testing.T) { assert.Equal(t, "root/resources/resource/namespaces/namespace/item.json", res) } +func Test_getOrderedResources(t *testing.T) { + tests := []struct { + name string + resourcePriorities []string + backupResources map[string]*archive.ResourceItems + want []string + }{ + { + name: "when only priorities are specified, they're returned in order", + resourcePriorities: []string{"prio-3", "prio-2", "prio-1"}, + backupResources: nil, + want: []string{"prio-3", "prio-2", "prio-1"}, + }, + { + name: "when only backup resources are specified, they're returned in alphabetical order", + resourcePriorities: nil, + backupResources: map[string]*archive.ResourceItems{ + "backup-resource-3": nil, + "backup-resource-2": nil, + "backup-resource-1": nil, + }, + want: []string{"backup-resource-1", "backup-resource-2", "backup-resource-3"}, + }, + { + name: "when priorities and backup resources are specified, they're returned in the correct order", + resourcePriorities: []string{"prio-3", "prio-2", "prio-1"}, + backupResources: map[string]*archive.ResourceItems{ + "prio-3": nil, + "backup-resource-3": nil, + "backup-resource-2": nil, + "backup-resource-1": nil, + }, + want: []string{"prio-3", "prio-2", "prio-1", "backup-resource-1", "backup-resource-2", "backup-resource-3", "prio-3"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, getOrderedResources(tc.resourcePriorities, tc.backupResources)) + }) + } +} + // assertResourceCreationOrder ensures that resources were created in the expected // order. Any resources *not* in resourcePriorities are required to come *after* all // resources in any order.