diff --git a/changelogs/unreleased/1937-nrb b/changelogs/unreleased/1937-nrb new file mode 100644 index 000000000..85f0d9af5 --- /dev/null +++ b/changelogs/unreleased/1937-nrb @@ -0,0 +1 @@ +Wait for CustomResourceDefinitions to be ready before restoring CustomResources. Also refresh the resource list from the Kubernetes API server after restoring CRDs in order to properly restore CRs. diff --git a/pkg/builder/customresourcedefinition_builder.go b/pkg/builder/customresourcedefinition_builder.go index ddfedaf60..847492884 100644 --- a/pkg/builder/customresourcedefinition_builder.go +++ b/pkg/builder/customresourcedefinition_builder.go @@ -41,12 +41,18 @@ func ForCustomResourceDefinition(name string) *CustomResourceDefinitionBuilder { } } +// Condition adds a CustomResourceDefinitionCondition objects to a CustomResourceDefinitionBuilder. +func (c *CustomResourceDefinitionBuilder) Condition(cond apiextv1beta1.CustomResourceDefinitionCondition) *CustomResourceDefinitionBuilder { + c.object.Status.Conditions = append(c.object.Status.Conditions, cond) + return c +} + // Result returns the built CustomResourceDefinition. func (b *CustomResourceDefinitionBuilder) Result() *apiextv1beta1.CustomResourceDefinition { return b.object } -// ObjectMeta applies functional options to the Namespace's ObjectMeta. +// ObjectMeta applies functional options to the CustomResourceDefinition's ObjectMeta. func (b *CustomResourceDefinitionBuilder) ObjectMeta(opts ...ObjectMetaOpt) *CustomResourceDefinitionBuilder { for _, opt := range opts { opt(b.object) @@ -54,3 +60,32 @@ func (b *CustomResourceDefinitionBuilder) ObjectMeta(opts ...ObjectMetaOpt) *Cus return b } + +// CustomResourceDefinitionConditionBuilder builds CustomResourceDefinitionCondition objects. +type CustomResourceDefinitionConditionBuilder struct { + object apiextv1beta1.CustomResourceDefinitionCondition +} + +// ForCustomResourceDefinitionConditionBuilder is the construction for a CustomResourceDefinitionConditionBuilder. +func ForCustomResourceDefinitionCondition() *CustomResourceDefinitionConditionBuilder { + return &CustomResourceDefinitionConditionBuilder{ + object: apiextv1beta1.CustomResourceDefinitionCondition{}, + } +} + +// Type sets the Condition's type. +func (c *CustomResourceDefinitionConditionBuilder) Type(t apiextv1beta1.CustomResourceDefinitionConditionType) *CustomResourceDefinitionConditionBuilder { + c.object.Type = t + return c +} + +// Status sets the Condition's status. +func (c *CustomResourceDefinitionConditionBuilder) Status(cs apiextv1beta1.ConditionStatus) *CustomResourceDefinitionConditionBuilder { + c.object.Status = cs + return c +} + +// Results returns the built CustomResourceDefinitionCondition. +func (b *CustomResourceDefinitionConditionBuilder) Result() apiextv1beta1.CustomResourceDefinitionCondition { + return b.object +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index c9264eee5..fc2484c01 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -440,7 +440,9 @@ func (s *server) validateBackupStorageLocations() error { return nil } -// - Namespaces go first because all namespaced resources depend on them. +// - Custom Resource Definitions come before Custom Resource so that they can be +// restored with their corresponding CRD. +// - Namespaces go second because all namespaced resources depend on them. // - Storage Classes are needed to create PVs and PVCs correctly. // - PVs go before PVCs because PVCs depend on them. // - PVCs go before pods or controllers so they can be mounted as volumes. @@ -452,9 +454,8 @@ func (s *server) validateBackupStorageLocations() error { // have restic restores run before controllers adopt the pods. // - Replica sets go before deployments/other controllers so they can be explicitly // restored and be adopted by controllers. -// - Custom Resource Definitions come before Custom Resource so that they can be -// restored with their corresponding CRD. var defaultRestorePriorities = []string{ + "customresourcedefinitions", "namespaces", "storageclasses", "persistentvolumes", @@ -469,7 +470,6 @@ var defaultRestorePriorities = []string{ // to ensure that we prioritize restoring from "apps" too, since this is how they're stored // in the backup. "replicasets.apps", - "customresourcedefinitions", } func (s *server) initRestic() error { diff --git a/pkg/install/install.go b/pkg/install/install.go index 44c58daea..df8932390 100644 --- a/pkg/install/install.go +++ b/pkg/install/install.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "github.com/vmware-tanzu/velero/pkg/client" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) // kindToResource translates a Kind (mixed case, singular) to a Resource (lowercase, plural) string. @@ -56,21 +57,6 @@ type ResourceGroup struct { OtherResources []*unstructured.Unstructured } -// crdIsReady checks a CRD to see if it's ready, so that objects may be created from it. -func crdIsReady(crd *apiextv1beta1.CustomResourceDefinition) bool { - var isEstablished, namesAccepted bool - for _, cond := range crd.Status.Conditions { - if cond.Type == apiextv1beta1.Established { - isEstablished = true - } - if cond.Type == apiextv1beta1.NamesAccepted { - namesAccepted = true - } - } - - return (isEstablished && namesAccepted) -} - // crdsAreReady polls the API server to see if the BackupStorageLocation and VolumeSnapshotLocation CRDs are ready to create objects. func crdsAreReady(factory client.DynamicFactory, crdKinds []string) (bool, error) { gvk := schema.FromAPIVersionAndKind(apiextv1beta1.SchemeGroupVersion.String(), "CustomResourceDefinition") @@ -108,7 +94,7 @@ func crdsAreReady(factory client.DynamicFactory, crdKinds []string) (bool, error } for _, crd := range foundCRDs { - if !crdIsReady(crd) { + if !kube.IsCRDReady(crd) { return false, nil } diff --git a/pkg/kuberesource/kuberesource.go b/pkg/kuberesource/kuberesource.go index 74bd59479..3a27686eb 100644 --- a/pkg/kuberesource/kuberesource.go +++ b/pkg/kuberesource/kuberesource.go @@ -21,12 +21,13 @@ import ( ) var ( - ClusterRoleBindings = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings"} - ClusterRoles = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterroles"} - Jobs = schema.GroupResource{Group: "batch", Resource: "jobs"} - Namespaces = schema.GroupResource{Group: "", Resource: "namespaces"} - PersistentVolumeClaims = schema.GroupResource{Group: "", Resource: "persistentvolumeclaims"} - PersistentVolumes = schema.GroupResource{Group: "", Resource: "persistentvolumes"} - Pods = schema.GroupResource{Group: "", Resource: "pods"} - ServiceAccounts = schema.GroupResource{Group: "", Resource: "serviceaccounts"} + ClusterRoleBindings = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings"} + ClusterRoles = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterroles"} + CustomResourceDefinitions = schema.GroupResource{Group: "apiextensions.k8s.io", Resource: "customresourcedefinitions"} + Jobs = schema.GroupResource{Group: "batch", Resource: "jobs"} + Namespaces = schema.GroupResource{Group: "", Resource: "namespaces"} + PersistentVolumeClaims = schema.GroupResource{Group: "", Resource: "persistentvolumeclaims"} + PersistentVolumes = schema.GroupResource{Group: "", Resource: "persistentvolumes"} + Pods = schema.GroupResource{Group: "", Resource: "pods"} + ServiceAccounts = schema.GroupResource{Group: "", Resource: "serviceaccounts"} ) diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index e3a6a49ca..4980d428b 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -284,6 +284,8 @@ func (kr *kubernetesRestorer) Restore( restoredItems: make(map[velero.ResourceIdentifier]struct{}), renamedPVs: make(map[string]string), pvRenamer: kr.pvRenamer, + discoveryHelper: kr.discoveryHelper, + resourcePriorities: kr.resourcePriorities, } return restoreCtx.execute() @@ -377,6 +379,8 @@ type context struct { restoredItems map[velero.ResourceIdentifier]struct{} renamedPVs map[string]string pvRenamer func(string) (string, error) + discoveryHelper discovery.Helper + resourcePriorities []string } type resourceClientKey struct { @@ -456,6 +460,84 @@ func (ctx *context) execute() (Result, Result) { } } + // TODO: Re-order this logic so that CRs can be prioritized in the main loop, rather than after. + + // Refresh and resolve based on CRDs added to the API server from the above restore loop. + // This is because CRDs have been added to the API groups but until we refresh, Velero doesn't know about the + // newly-added API groups in order to create the CRs from them. + if err := ctx.discoveryHelper.Refresh(); err != nil { + // Don't break on error here, since newResources will be the same as the original prioritizedResources, + // and thus addedResources will end up being empty and we'll restore nothing. + // Since we're continuing the restore, add a warning, not an error. + addVeleroError(&warnings, errors.Wrap(err, "error refreshing discovery API")) + } + newResources, err := prioritizeResources(ctx.discoveryHelper, ctx.resourcePriorities, ctx.resourceIncludesExcludes, ctx.log) + if err != nil { + // If there was an error, then newResources will be nil, so we can continue on the restore. + // addedResources will end up being nil, but we should still report this failure. + addVeleroError(&warnings, errors.Wrap(err, "error sorting resources")) + } + + // Filter the resources to only those added since our first restore pass. + addedResources := make([]schema.GroupResource, 0) + for _, r := range newResources { + var found bool + for _, p := range ctx.prioritizedResources { + if r == p { + found = true + break + } + } + // Resource hasn't already been processed, so queue it for the next loop. + if !found { + ctx.log.Debugf("Discovered new resource %s", r) + addedResources = append(addedResources, r) + } + } + + // Use the same restore logic as above, but for newly available API groups (CRDs) + for _, resource := range addedResources { + resourceList := backupResources[resource.String()] + if resourceList == nil { + continue + } + + for namespace, items := range resourceList.ItemsByNamespace { + if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) { + ctx.log.Infof("Skipping namespace %s", namespace) + continue + } + + // get target namespace to restore into, if different + // from source namespace + targetNamespace := namespace + if target, ok := ctx.restore.Spec.NamespaceMapping[namespace]; ok { + targetNamespace = target + } + + // if we don't know whether this namespace exists yet, attempt to create + // it in order to ensure it exists. Try to get it from the backup tarball + // (in order to get any backed-up metadata), but if we don't find it there, + // create a blank one. + if namespace != "" && !existingNamespaces.Has(targetNamespace) { + logger := ctx.log.WithField("namespace", namespace) + ns := getNamespace(logger, getItemFilePath(ctx.restoreDir, "namespaces", "", namespace), targetNamespace) + if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil { + addVeleroError(&errs, err) + continue + } + + // keep track of namespaces that we know exist so we don't + // have to try to create them multiple times + existingNamespaces.Insert(targetNamespace) + } + + w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items) + merge(&warnings, &w) + merge(&errs, &e) + } + } + // wait for all of the restic restore goroutines to be done, which is // only possible once all of their errors have been received by the loop // below, then close the resticErrs channel so the loop terminates. @@ -672,6 +754,42 @@ func (ctx *context) shouldRestore(name string, pvClient client.Dynamic) (bool, e return shouldRestore, err } +// crdAvailable waits for a CRD to be available for use before letting the restore continue. +func (ctx *context) crdAvailable(name string, crdClient client.Dynamic) (bool, error) { + crdLogger := ctx.log.WithField("crdName", name) + + var available bool + // Wait 1 minute rather than the standard resource timeout, since each CRD will transition fairly quickly + err := wait.PollImmediate(time.Second, time.Minute*1, func() (bool, error) { + unstructuredCRD, err := crdClient.Get(name, metav1.GetOptions{}) + if err != nil { + return true, err + } + + // TODO: Due to upstream conversion issues in runtime.FromUnstructured, we use the unstructured object here. + // Once the upstream conversion functions are fixed, we should convert to the CRD types and use IsCRDReady + available, err = kube.IsUnstructuredCRDReady(unstructuredCRD) + + if err != nil { + return true, err + } + + if !available { + crdLogger.Debug("CRD not yet ready for use") + } + + // If the CRD is not available, keep polling (false, nil) + // If the CRD is available, break the poll and return back to caller (true, nil) + return available, nil + }) + + if err == wait.ErrWaitTimeout { + crdLogger.Debug("timeout reached waiting for custom resource definition to be ready") + } + + return available, err +} + // restoreResource restores the specified cluster or namespace scoped resource. If namespace is // empty we are restoring a cluster level resource, otherwise into the specified namespace. func (ctx *context) restoreResource(resource, targetNamespace, originalNamespace string, items []string) (Result, Result) { @@ -1113,6 +1231,17 @@ func (ctx *context) restoreItem(obj *unstructured.Unstructured, groupResource sc restorePodVolumeBackups(ctx, createdObj, originalNamespace) } + // Wait for a CRD to be available for instantiating resources + // before continuing. + if groupResource == kuberesource.CustomResourceDefinitions { + available, err := ctx.crdAvailable(name, resourceClient) + if err != nil { + addToResult(&errs, namespace, errors.Wrapf(err, "error verifying custom resource definition is ready to use")) + } else if !available { + addToResult(&errs, namespace, fmt.Errorf("CRD %s is not available to use for custom resources.", name)) + } + } + return warnings, errs } diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index d20037e78..a78502b80 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -1,5 +1,5 @@ /* -Copyright 2017 the Velero contributors. +Copyright 2017, 2019 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,8 +22,10 @@ import ( "github.com/pkg/errors" corev1api "k8s.io/api/core/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" @@ -135,3 +137,73 @@ func GetVolumeDirectory(pod *corev1api.Pod, volumeName string, pvcLister corev1l return pvc.Spec.VolumeName, nil } + +// IsCRDReady checks a CRD to see if it's ready, with both the Established and NamesAccepted conditions. +func IsCRDReady(crd *apiextv1beta1.CustomResourceDefinition) bool { + var isEstablished, namesAccepted bool + for _, cond := range crd.Status.Conditions { + if cond.Type == apiextv1beta1.Established && cond.Status == apiextv1beta1.ConditionTrue { + isEstablished = true + } + if cond.Type == apiextv1beta1.NamesAccepted && cond.Status == apiextv1beta1.ConditionTrue { + namesAccepted = true + } + } + + return (isEstablished && namesAccepted) +} + +// IsUnstructuredCRDReady checks an unstructured CRD to see if it's ready, with both the Established and NamesAccepted conditions. +// TODO: Delete this function and use IsCRDReady when the upstream runtime.FromUnstructured function properly handles int64 field conversions. +// Duplicated function because the velero install package uses IsCRDReady with the beta types. +// See https://github.com/kubernetes/kubernetes/issues/87675 +func IsUnstructuredCRDReady(crd *unstructured.Unstructured) (bool, error) { + var isEstablished, namesAccepted bool + + conditions, ok, err := unstructured.NestedSlice(crd.UnstructuredContent(), "status", "conditions") + if !ok { + return false, nil + } + if err != nil { + return false, errors.Wrap(err, "unable to access CRD's conditions") + } + + for _, c := range conditions { + // Unlike the typed version of this function, we need to cast the Condition since it's an interface{} here, + // then we fetch the type and status of the Condition before inspecting them for relevant values + cond, ok := c.(map[string]interface{}) + if !ok { + return false, errors.New("unable to convert condition to map[string]interface{}") + } + conditionType, ok, err := unstructured.NestedString(cond, "type") + if !ok { + // This should never happen unless someone manually edits the serialized data. + return false, errors.New("condition missing a type") + } + + if err != nil { + return false, errors.Wrap(err, "unable to access condition's type") + } + + status, ok, err := unstructured.NestedString(cond, "status") + if !ok { + // This should never happen unless someone manually edits the serialized data. + return false, errors.New("condition missing a status") + } + + if err != nil { + return false, errors.Wrap(err, "unable to access condition's status") + } + + // Here is the actual logic of the function + // Cast the API's types into strings since we're pulling strings out of the unstructured data. + if conditionType == string(apiextv1beta1.Established) && status == string(apiextv1beta1.ConditionTrue) { + isEstablished = true + } + if conditionType == string(apiextv1beta1.NamesAccepted) && status == string(apiextv1beta1.ConditionTrue) { + namesAccepted = true + } + } + + return (isEstablished && namesAccepted), nil +} diff --git a/pkg/util/kube/utils_test.go b/pkg/util/kube/utils_test.go index 2c572b221..7b2c0cccc 100644 --- a/pkg/util/kube/utils_test.go +++ b/pkg/util/kube/utils_test.go @@ -17,6 +17,7 @@ limitations under the License. package kube import ( + "encoding/json" "testing" "time" @@ -24,8 +25,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" kubeinformers "k8s.io/client-go/informers" @@ -197,3 +201,137 @@ func TestGetVolumeDirectorySuccess(t *testing.T) { assert.Equal(t, tc.want, dir) } } + +func TestIsCRDReady(t *testing.T) { + tests := []struct { + name string + crd *apiextv1beta1.CustomResourceDefinition + want bool + }{ + { + name: "CRD is not established & not accepting names - not ready", + crd: builder.ForCustomResourceDefinition("MyCRD").Result(), + want: false, + }, + { + name: "CRD is established & not accepting names - not ready", + crd: builder.ForCustomResourceDefinition("MyCRD"). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()).Result(), + want: false, + }, + { + name: "CRD is not established & accepting names - not ready", + crd: builder.ForCustomResourceDefinition("MyCRD"). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()).Result(), + want: false, + }, + { + name: "CRD is established & accepting names - ready", + crd: builder.ForCustomResourceDefinition("MyCRD"). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()). + Result(), + want: true, + }, + } + + for _, tc := range tests { + result := IsCRDReady(tc.crd) + assert.Equal(t, tc.want, result) + } +} + +func TestIsUnstructuredCRDReady(t *testing.T) { + tests := []struct { + name string + crd *apiextv1beta1.CustomResourceDefinition + want bool + }{ + { + name: "CRD is not established & not accepting names - not ready", + crd: builder.ForCustomResourceDefinition("MyCRD").Result(), + want: false, + }, + { + name: "CRD is established & not accepting names - not ready", + crd: builder.ForCustomResourceDefinition("MyCRD"). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()).Result(), + want: false, + }, + { + name: "CRD is not established & accepting names - not ready", + crd: builder.ForCustomResourceDefinition("MyCRD"). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()).Result(), + want: false, + }, + { + name: "CRD is established & accepting names - ready", + crd: builder.ForCustomResourceDefinition("MyCRD"). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()). + Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()). + Result(), + want: true, + }, + } + + for _, tc := range tests { + m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.crd) + require.NoError(t, err) + result, err := IsUnstructuredCRDReady(&unstructured.Unstructured{Object: m}) + require.NoError(t, err) + assert.Equal(t, tc.want, result) + } +} + +// TestFromUnstructuredIntToFloatBug tests for a bug where runtime.DefaultUnstructuredConverter.FromUnstructured can't take a whole number into a float. +// This test should fail when https://github.com/kubernetes/kubernetes/issues/87675 is fixed upstream, letting us know we can remove the IsUnstructuredCRDReady function. +func TestFromUnstructuredIntToFloatBug(t *testing.T) { + b := []byte(` +{ + "apiVersion": "apiextensions.k8s.io/v1beta1", + "kind": "CustomResourceDefinition", + "metadata": { + "name": "foos.example.foo.com" + }, + "spec": { + "group": "example.foo.com", + "version": "v1alpha1", + "scope": "Namespaced", + "names": { + "plural": "foos", + "singular": "foo", + "kind": "Foo" + }, + "validation": { + "openAPIV3Schema": { + "required": [ + "spec" + ], + "properties": { + "spec": { + "required": [ + "bar" + ], + "properties": { + "bar": { + "type": "integer", + "minimum": 1 + } + } + } + } + } + } + } + } +`) + + var obj unstructured.Unstructured + err := json.Unmarshal(b, &obj) + require.NoError(t, err) + + var newCRD apiextv1beta1.CustomResourceDefinition + err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &newCRD) + // If there's no error, then the upstream issue is fixed, and we need to remove our workarounds. + require.Error(t, err) +}