mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-08 14:21:18 +00:00
Merge pull request #2248 from skriss/refactor-restore-priorities
refactor restore priorities code to use single loop and lazy discovery
This commit is contained in:
1
changelogs/unreleased/2248-skriss
Normal file
1
changelogs/unreleased/2248-skriss
Normal file
@@ -0,0 +1 @@
|
||||
refactor restore code to lazily resolve resources via discovery and eliminate second restore loop for instances of restored CRDs
|
||||
@@ -98,64 +98,6 @@ type kubernetesRestorer struct {
|
||||
logger logrus.FieldLogger
|
||||
}
|
||||
|
||||
// prioritizeResources returns an ordered, fully-resolved list of resources to restore based on
|
||||
// the provided discovery helper, resource priorities, and included/excluded resources.
|
||||
func prioritizeResources(helper discovery.Helper, priorities []string, includedResources *collections.IncludesExcludes, logger logrus.FieldLogger) ([]schema.GroupResource, error) {
|
||||
var ret []schema.GroupResource
|
||||
|
||||
// set keeps track of resolved GroupResource names
|
||||
set := sets.NewString()
|
||||
|
||||
// start by resolving priorities into GroupResources and adding them to ret
|
||||
for _, r := range priorities {
|
||||
gvr, _, err := helper.ResourceFor(schema.ParseGroupResource(r).WithVersion(""))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gr := gvr.GroupResource()
|
||||
|
||||
if !includedResources.ShouldInclude(gr.String()) {
|
||||
logger.WithField("groupResource", gr).Info("Not including resource")
|
||||
continue
|
||||
}
|
||||
ret = append(ret, gr)
|
||||
set.Insert(gr.String())
|
||||
}
|
||||
|
||||
// go through everything we got from discovery and add anything not in "set" to byName
|
||||
var byName []schema.GroupResource
|
||||
for _, resourceGroup := range helper.Resources() {
|
||||
// will be something like storage.k8s.io/v1
|
||||
groupVersion, err := schema.ParseGroupVersion(resourceGroup.GroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, resource := range resourceGroup.APIResources {
|
||||
gr := groupVersion.WithResource(resource.Name).GroupResource()
|
||||
|
||||
if !includedResources.ShouldInclude(gr.String()) {
|
||||
logger.WithField("groupResource", gr.String()).Info("Not including resource")
|
||||
continue
|
||||
}
|
||||
|
||||
if !set.Has(gr.String()) {
|
||||
byName = append(byName, gr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sort byName by name
|
||||
sort.Slice(byName, func(i, j int) bool {
|
||||
return byName[i].String() < byName[j].String()
|
||||
})
|
||||
|
||||
// combine prioritized with by-name
|
||||
ret = append(ret, byName...)
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// NewKubernetesRestorer creates a new kubernetesRestorer.
|
||||
func NewKubernetesRestorer(
|
||||
discoveryHelper discovery.Helper,
|
||||
@@ -213,10 +155,6 @@ func (kr *kubernetesRestorer) Restore(
|
||||
|
||||
// get resource includes-excludes
|
||||
resourceIncludesExcludes := getResourceIncludesExcludes(kr.discoveryHelper, req.Restore.Spec.IncludedResources, req.Restore.Spec.ExcludedResources)
|
||||
prioritizedResources, err := prioritizeResources(kr.discoveryHelper, kr.resourcePriorities, resourceIncludesExcludes, req.Log)
|
||||
if err != nil {
|
||||
return Result{}, Result{Velero: []string{err.Error()}}
|
||||
}
|
||||
|
||||
// get namespace includes-excludes
|
||||
namespaceIncludesExcludes := collections.NewIncludesExcludes().
|
||||
@@ -265,7 +203,6 @@ func (kr *kubernetesRestorer) Restore(
|
||||
restore: req.Restore,
|
||||
resourceIncludesExcludes: resourceIncludesExcludes,
|
||||
namespaceIncludesExcludes: namespaceIncludesExcludes,
|
||||
prioritizedResources: prioritizedResources,
|
||||
selector: selector,
|
||||
log: req.Log,
|
||||
dynamicFactory: kr.dynamicFactory,
|
||||
@@ -359,7 +296,6 @@ type context struct {
|
||||
restoreDir string
|
||||
resourceIncludesExcludes *collections.IncludesExcludes
|
||||
namespaceIncludesExcludes *collections.IncludesExcludes
|
||||
prioritizedResources []schema.GroupResource
|
||||
selector labels.Selector
|
||||
log logrus.FieldLogger
|
||||
dynamicFactory client.DynamicFactory
|
||||
@@ -388,6 +324,21 @@ type resourceClientKey struct {
|
||||
namespace string
|
||||
}
|
||||
|
||||
// getOrderedResources returns an ordered list of resource identifiers to restore, based on the provided resource
|
||||
// priorities and backup contents. The returned list begins with all of the prioritized resources (in order), and
|
||||
// appends to that an alphabetized list of all resources in the backup.
|
||||
func getOrderedResources(resourcePriorities []string, backupResources map[string]*archive.ResourceItems) []string {
|
||||
// alphabetize resources in the backup
|
||||
orderedBackupResources := make([]string, 0, len(backupResources))
|
||||
for resource := range backupResources {
|
||||
orderedBackupResources = append(orderedBackupResources, resource)
|
||||
}
|
||||
sort.Strings(orderedBackupResources)
|
||||
|
||||
// master list: everything in resource priorities, followed by what's in the backup (alphabetized)
|
||||
return append(resourcePriorities, orderedBackupResources...)
|
||||
}
|
||||
|
||||
func (ctx *context) execute() (Result, Result) {
|
||||
warnings, errs := Result{}, Result{}
|
||||
|
||||
@@ -404,26 +355,65 @@ func (ctx *context) execute() (Result, Result) {
|
||||
// need to set this for additionalItems to be restored
|
||||
ctx.restoreDir = dir
|
||||
|
||||
var (
|
||||
existingNamespaces = sets.NewString()
|
||||
processedResources = sets.NewString()
|
||||
)
|
||||
|
||||
backupResources, err := archive.NewParser(ctx.log, ctx.fileSystem).Parse(ctx.restoreDir)
|
||||
if err != nil {
|
||||
errs.AddVeleroError(errors.Wrap(err, "error parsing backup contents"))
|
||||
return warnings, errs
|
||||
}
|
||||
|
||||
existingNamespaces := sets.NewString()
|
||||
// Iterate through an ordered list of resources to restore, checking each one to see if it should be restored.
|
||||
// Note that resources *may* be in this list twice, i.e. once due to being a prioritized resource, and once due
|
||||
// to being in the backup tarball. We can't de-dupe this upfront, because it's possible that items in the prioritized
|
||||
// resources list may not be fully resolved group-resource strings (e.g. may be specfied as "po" instead of "pods"),
|
||||
// and we don't want to fully resolve them via discovery until we reach them in the loop, because it is possible
|
||||
// that the resource/API itself is being restored via a custom resource definition, meaning it's not available via
|
||||
// discovery prior to beginning the restore.
|
||||
//
|
||||
// Since we keep track of the fully-resolved group-resources that we *have* restored, we won't try to restore a
|
||||
// resource twice even if it's in the ordered list twice.
|
||||
for _, resource := range getOrderedResources(ctx.resourcePriorities, backupResources) {
|
||||
// try to resolve the resource via discovery to a complete group/version/resource
|
||||
gvr, _, err := ctx.discoveryHelper.ResourceFor(schema.ParseGroupResource(resource).WithVersion(""))
|
||||
if err != nil {
|
||||
ctx.log.WithField("resource", resource).Infof("Skipping restore of resource because it cannot be resolved via discovery")
|
||||
continue
|
||||
}
|
||||
groupResource := gvr.GroupResource()
|
||||
|
||||
// check if we've already restored this resource (this would happen if the resource
|
||||
// we're currently looking at was already restored because it was a prioritized
|
||||
// resource, and now we're looking at it as part of the backup contents).
|
||||
if processedResources.Has(groupResource.String()) {
|
||||
ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's already been processed")
|
||||
continue
|
||||
}
|
||||
|
||||
// check if the resource should be restored according to the resource includes/excludes
|
||||
if !ctx.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
|
||||
ctx.log.WithField("resource", groupResource.String()).Infof("Skipping restore of resource because the restore spec excludes it")
|
||||
continue
|
||||
}
|
||||
|
||||
for _, resource := range ctx.prioritizedResources {
|
||||
// we don't want to explicitly restore namespace API objs because we'll handle
|
||||
// them as a special case prior to restoring anything into them
|
||||
if resource == kuberesource.Namespaces {
|
||||
if groupResource == kuberesource.Namespaces {
|
||||
continue
|
||||
}
|
||||
|
||||
resourceList := backupResources[resource.String()]
|
||||
// check if the resource is present in the backup
|
||||
resourceList := backupResources[groupResource.String()]
|
||||
if resourceList == nil {
|
||||
ctx.log.WithField("resource", groupResource.String()).Debugf("Skipping restore of resource because it's not present in the backup tarball")
|
||||
continue
|
||||
}
|
||||
|
||||
// iterate through each namespace that contains instances of the resource and
|
||||
// restore them
|
||||
for namespace, items := range resourceList.ItemsByNamespace {
|
||||
if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
|
||||
ctx.log.Infof("Skipping namespace %s", namespace)
|
||||
@@ -454,88 +444,23 @@ func (ctx *context) execute() (Result, Result) {
|
||||
existingNamespaces.Insert(targetNamespace)
|
||||
}
|
||||
|
||||
w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items)
|
||||
w, e := ctx.restoreResource(groupResource.String(), targetNamespace, namespace, items)
|
||||
warnings.Merge(&w)
|
||||
errs.Merge(&e)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Re-order this logic so that CRs can be prioritized in the main loop, rather than after.
|
||||
// record that we've restored the resource
|
||||
processedResources.Insert(groupResource.String())
|
||||
|
||||
// Refresh and resolve based on CRDs added to the API server from the above restore loop.
|
||||
// This is because CRDs have been added to the API groups but until we refresh, Velero doesn't know about the
|
||||
// newly-added API groups in order to create the CRs from them.
|
||||
if err := ctx.discoveryHelper.Refresh(); err != nil {
|
||||
// Don't break on error here, since newResources will be the same as the original prioritizedResources,
|
||||
// and thus addedResources will end up being empty and we'll restore nothing.
|
||||
// Since we're continuing the restore, add a warning, not an error.
|
||||
warnings.AddVeleroError(errors.Wrap(err, "error refreshing discovery API"))
|
||||
}
|
||||
newResources, err := prioritizeResources(ctx.discoveryHelper, ctx.resourcePriorities, ctx.resourceIncludesExcludes, ctx.log)
|
||||
if err != nil {
|
||||
// If there was an error, then newResources will be nil, so we can continue on the restore.
|
||||
// addedResources will end up being nil, but we should still report this failure.
|
||||
warnings.AddVeleroError(errors.Wrap(err, "error sorting resources"))
|
||||
}
|
||||
|
||||
// Filter the resources to only those added since our first restore pass.
|
||||
addedResources := make([]schema.GroupResource, 0)
|
||||
for _, r := range newResources {
|
||||
var found bool
|
||||
for _, p := range ctx.prioritizedResources {
|
||||
if r == p {
|
||||
found = true
|
||||
break
|
||||
// if we just restored custom resource definitions (CRDs), refresh discovery
|
||||
// because the restored CRDs may have created new APIs that didn't previously
|
||||
// exist in the cluster, and we want to be able to resolve & restore instances
|
||||
// of them in subsequent loop iterations.
|
||||
if groupResource == kuberesource.CustomResourceDefinitions {
|
||||
if err := ctx.discoveryHelper.Refresh(); err != nil {
|
||||
warnings.Add("", errors.Wrap(err, "error refreshing discovery after restoring custom resource definitions"))
|
||||
}
|
||||
}
|
||||
// Resource hasn't already been processed, so queue it for the next loop.
|
||||
if !found {
|
||||
ctx.log.Debugf("Discovered new resource %s", r)
|
||||
addedResources = append(addedResources, r)
|
||||
}
|
||||
}
|
||||
|
||||
// Use the same restore logic as above, but for newly available API groups (CRDs)
|
||||
for _, resource := range addedResources {
|
||||
resourceList := backupResources[resource.String()]
|
||||
if resourceList == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for namespace, items := range resourceList.ItemsByNamespace {
|
||||
if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
|
||||
ctx.log.Infof("Skipping namespace %s", namespace)
|
||||
continue
|
||||
}
|
||||
|
||||
// get target namespace to restore into, if different
|
||||
// from source namespace
|
||||
targetNamespace := namespace
|
||||
if target, ok := ctx.restore.Spec.NamespaceMapping[namespace]; ok {
|
||||
targetNamespace = target
|
||||
}
|
||||
|
||||
// if we don't know whether this namespace exists yet, attempt to create
|
||||
// it in order to ensure it exists. Try to get it from the backup tarball
|
||||
// (in order to get any backed-up metadata), but if we don't find it there,
|
||||
// create a blank one.
|
||||
if namespace != "" && !existingNamespaces.Has(targetNamespace) {
|
||||
logger := ctx.log.WithField("namespace", namespace)
|
||||
ns := getNamespace(logger, getItemFilePath(ctx.restoreDir, "namespaces", "", namespace), targetNamespace)
|
||||
if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
|
||||
errs.AddVeleroError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// keep track of namespaces that we know exist so we don't
|
||||
// have to try to create them multiple times
|
||||
existingNamespaces.Insert(targetNamespace)
|
||||
}
|
||||
|
||||
w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items)
|
||||
warnings.Merge(&w)
|
||||
errs.Merge(&e)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for all of the restic restore goroutines to be done, which is
|
||||
|
||||
@@ -39,12 +39,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
discoveryfake "k8s.io/client-go/discovery/fake"
|
||||
"k8s.io/client-go/dynamic"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
kubetesting "k8s.io/client-go/testing"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/archive"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/client"
|
||||
"github.com/vmware-tanzu/velero/pkg/discovery"
|
||||
@@ -55,7 +54,6 @@ import (
|
||||
resticmocks "github.com/vmware-tanzu/velero/pkg/restic/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/test"
|
||||
testutil "github.com/vmware-tanzu/velero/pkg/test"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/collections"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/encode"
|
||||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/volume"
|
||||
@@ -2485,88 +2483,6 @@ func TestRestoreWithRestic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrioritizeResources(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
apiResources map[string][]string
|
||||
priorities []string
|
||||
includes []string
|
||||
excludes []string
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
name: "priorities & ordering are correctly applied",
|
||||
apiResources: map[string][]string{
|
||||
"v1": {"aaa", "bbb", "configmaps", "ddd", "namespaces", "ooo", "pods", "sss"},
|
||||
},
|
||||
priorities: []string{"namespaces", "configmaps", "pods"},
|
||||
includes: []string{"*"},
|
||||
expected: []string{"namespaces", "configmaps", "pods", "aaa", "bbb", "ddd", "ooo", "sss"},
|
||||
},
|
||||
{
|
||||
name: "includes are correctly applied",
|
||||
apiResources: map[string][]string{
|
||||
"v1": {"aaa", "bbb", "configmaps", "ddd", "namespaces", "ooo", "pods", "sss"},
|
||||
},
|
||||
priorities: []string{"namespaces", "configmaps", "pods"},
|
||||
includes: []string{"namespaces", "aaa", "sss"},
|
||||
expected: []string{"namespaces", "aaa", "sss"},
|
||||
},
|
||||
{
|
||||
name: "excludes are correctly applied",
|
||||
apiResources: map[string][]string{
|
||||
"v1": {"aaa", "bbb", "configmaps", "ddd", "namespaces", "ooo", "pods", "sss"},
|
||||
},
|
||||
priorities: []string{"namespaces", "configmaps", "pods"},
|
||||
includes: []string{"*"},
|
||||
excludes: []string{"ooo", "pods"},
|
||||
expected: []string{"namespaces", "configmaps", "aaa", "bbb", "ddd", "sss"},
|
||||
},
|
||||
}
|
||||
|
||||
logger := testutil.NewLogger()
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
discoveryClient := &test.DiscoveryClient{
|
||||
FakeDiscovery: kubefake.NewSimpleClientset().Discovery().(*discoveryfake.FakeDiscovery),
|
||||
}
|
||||
|
||||
helper, err := discovery.NewHelper(discoveryClient, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
// add all the test case's API resources to the discovery client
|
||||
for gvString, resources := range tc.apiResources {
|
||||
gv, err := schema.ParseGroupVersion(gvString)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, resource := range resources {
|
||||
discoveryClient.WithAPIResource(&test.APIResource{
|
||||
Group: gv.Group,
|
||||
Version: gv.Version,
|
||||
Name: resource,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, helper.Refresh())
|
||||
|
||||
includesExcludes := collections.NewIncludesExcludes().Includes(tc.includes...).Excludes(tc.excludes...)
|
||||
|
||||
result, err := prioritizeResources(helper, tc.priorities, includesExcludes, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(tc.expected), len(result))
|
||||
|
||||
for i := range result {
|
||||
if e, a := tc.expected[i], result[i].Resource; e != a {
|
||||
t.Errorf("index %d, expected %s, got %s", i, e, a)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetMetadataAndStatus(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@@ -2681,6 +2597,49 @@ func TestGetItemFilePath(t *testing.T) {
|
||||
assert.Equal(t, "root/resources/resource/namespaces/namespace/item.json", res)
|
||||
}
|
||||
|
||||
func Test_getOrderedResources(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
resourcePriorities []string
|
||||
backupResources map[string]*archive.ResourceItems
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "when only priorities are specified, they're returned in order",
|
||||
resourcePriorities: []string{"prio-3", "prio-2", "prio-1"},
|
||||
backupResources: nil,
|
||||
want: []string{"prio-3", "prio-2", "prio-1"},
|
||||
},
|
||||
{
|
||||
name: "when only backup resources are specified, they're returned in alphabetical order",
|
||||
resourcePriorities: nil,
|
||||
backupResources: map[string]*archive.ResourceItems{
|
||||
"backup-resource-3": nil,
|
||||
"backup-resource-2": nil,
|
||||
"backup-resource-1": nil,
|
||||
},
|
||||
want: []string{"backup-resource-1", "backup-resource-2", "backup-resource-3"},
|
||||
},
|
||||
{
|
||||
name: "when priorities and backup resources are specified, they're returned in the correct order",
|
||||
resourcePriorities: []string{"prio-3", "prio-2", "prio-1"},
|
||||
backupResources: map[string]*archive.ResourceItems{
|
||||
"prio-3": nil,
|
||||
"backup-resource-3": nil,
|
||||
"backup-resource-2": nil,
|
||||
"backup-resource-1": nil,
|
||||
},
|
||||
want: []string{"prio-3", "prio-2", "prio-1", "backup-resource-1", "backup-resource-2", "backup-resource-3", "prio-3"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Equal(t, tc.want, getOrderedResources(tc.resourcePriorities, tc.backupResources))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// assertResourceCreationOrder ensures that resources were created in the expected
|
||||
// order. Any resources *not* in resourcePriorities are required to come *after* all
|
||||
// resources in any order.
|
||||
|
||||
Reference in New Issue
Block a user