Wait for CRDs to be ready before restoring CRs (#1937)

* Wait for CRDs to be available and ready

When restoring CRDs, we should wait for the definition to be ready and
available before moving on to restoring specific CRs.

While the CRDs are often ready by the time we get to restoring a CR,
there is a race condition where the CRD isn't ready.

This change waits on each CRD at restore time.

Signed-off-by: Nolan Brubaker <brubakern@vmware.com>
This commit is contained in:
Nolan Brubaker
2020-01-30 12:19:13 -05:00
committed by GitHub
parent 710beb96c2
commit 6745979a7b
8 changed files with 392 additions and 30 deletions

View File

@@ -0,0 +1 @@
Wait for CustomResourceDefinitions to be ready before restoring CustomResources. Also refresh the resource list from the Kubernetes API server after restoring CRDs in order to properly restore CRs.

View File

@@ -41,12 +41,18 @@ func ForCustomResourceDefinition(name string) *CustomResourceDefinitionBuilder {
}
}
// Condition adds a CustomResourceDefinitionCondition objects to a CustomResourceDefinitionBuilder.
func (c *CustomResourceDefinitionBuilder) Condition(cond apiextv1beta1.CustomResourceDefinitionCondition) *CustomResourceDefinitionBuilder {
c.object.Status.Conditions = append(c.object.Status.Conditions, cond)
return c
}
// Result returns the built CustomResourceDefinition.
func (b *CustomResourceDefinitionBuilder) Result() *apiextv1beta1.CustomResourceDefinition {
return b.object
}
// ObjectMeta applies functional options to the Namespace's ObjectMeta.
// ObjectMeta applies functional options to the CustomResourceDefinition's ObjectMeta.
func (b *CustomResourceDefinitionBuilder) ObjectMeta(opts ...ObjectMetaOpt) *CustomResourceDefinitionBuilder {
for _, opt := range opts {
opt(b.object)
@@ -54,3 +60,32 @@ func (b *CustomResourceDefinitionBuilder) ObjectMeta(opts ...ObjectMetaOpt) *Cus
return b
}
// CustomResourceDefinitionConditionBuilder builds CustomResourceDefinitionCondition objects.
type CustomResourceDefinitionConditionBuilder struct {
object apiextv1beta1.CustomResourceDefinitionCondition
}
// ForCustomResourceDefinitionConditionBuilder is the construction for a CustomResourceDefinitionConditionBuilder.
func ForCustomResourceDefinitionCondition() *CustomResourceDefinitionConditionBuilder {
return &CustomResourceDefinitionConditionBuilder{
object: apiextv1beta1.CustomResourceDefinitionCondition{},
}
}
// Type sets the Condition's type.
func (c *CustomResourceDefinitionConditionBuilder) Type(t apiextv1beta1.CustomResourceDefinitionConditionType) *CustomResourceDefinitionConditionBuilder {
c.object.Type = t
return c
}
// Status sets the Condition's status.
func (c *CustomResourceDefinitionConditionBuilder) Status(cs apiextv1beta1.ConditionStatus) *CustomResourceDefinitionConditionBuilder {
c.object.Status = cs
return c
}
// Results returns the built CustomResourceDefinitionCondition.
func (b *CustomResourceDefinitionConditionBuilder) Result() apiextv1beta1.CustomResourceDefinitionCondition {
return b.object
}

View File

@@ -440,7 +440,9 @@ func (s *server) validateBackupStorageLocations() error {
return nil
}
// - Namespaces go first because all namespaced resources depend on them.
// - Custom Resource Definitions come before Custom Resource so that they can be
// restored with their corresponding CRD.
// - Namespaces go second because all namespaced resources depend on them.
// - Storage Classes are needed to create PVs and PVCs correctly.
// - PVs go before PVCs because PVCs depend on them.
// - PVCs go before pods or controllers so they can be mounted as volumes.
@@ -452,9 +454,8 @@ func (s *server) validateBackupStorageLocations() error {
// have restic restores run before controllers adopt the pods.
// - Replica sets go before deployments/other controllers so they can be explicitly
// restored and be adopted by controllers.
// - Custom Resource Definitions come before Custom Resource so that they can be
// restored with their corresponding CRD.
var defaultRestorePriorities = []string{
"customresourcedefinitions",
"namespaces",
"storageclasses",
"persistentvolumes",
@@ -469,7 +470,6 @@ var defaultRestorePriorities = []string{
// to ensure that we prioritize restoring from "apps" too, since this is how they're stored
// in the backup.
"replicasets.apps",
"customresourcedefinitions",
}
func (s *server) initRestic() error {

View File

@@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
// kindToResource translates a Kind (mixed case, singular) to a Resource (lowercase, plural) string.
@@ -56,21 +57,6 @@ type ResourceGroup struct {
OtherResources []*unstructured.Unstructured
}
// crdIsReady checks a CRD to see if it's ready, so that objects may be created from it.
func crdIsReady(crd *apiextv1beta1.CustomResourceDefinition) bool {
var isEstablished, namesAccepted bool
for _, cond := range crd.Status.Conditions {
if cond.Type == apiextv1beta1.Established {
isEstablished = true
}
if cond.Type == apiextv1beta1.NamesAccepted {
namesAccepted = true
}
}
return (isEstablished && namesAccepted)
}
// crdsAreReady polls the API server to see if the BackupStorageLocation and VolumeSnapshotLocation CRDs are ready to create objects.
func crdsAreReady(factory client.DynamicFactory, crdKinds []string) (bool, error) {
gvk := schema.FromAPIVersionAndKind(apiextv1beta1.SchemeGroupVersion.String(), "CustomResourceDefinition")
@@ -108,7 +94,7 @@ func crdsAreReady(factory client.DynamicFactory, crdKinds []string) (bool, error
}
for _, crd := range foundCRDs {
if !crdIsReady(crd) {
if !kube.IsCRDReady(crd) {
return false, nil
}

View File

@@ -21,12 +21,13 @@ import (
)
var (
ClusterRoleBindings = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings"}
ClusterRoles = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterroles"}
Jobs = schema.GroupResource{Group: "batch", Resource: "jobs"}
Namespaces = schema.GroupResource{Group: "", Resource: "namespaces"}
PersistentVolumeClaims = schema.GroupResource{Group: "", Resource: "persistentvolumeclaims"}
PersistentVolumes = schema.GroupResource{Group: "", Resource: "persistentvolumes"}
Pods = schema.GroupResource{Group: "", Resource: "pods"}
ServiceAccounts = schema.GroupResource{Group: "", Resource: "serviceaccounts"}
ClusterRoleBindings = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterrolebindings"}
ClusterRoles = schema.GroupResource{Group: "rbac.authorization.k8s.io", Resource: "clusterroles"}
CustomResourceDefinitions = schema.GroupResource{Group: "apiextensions.k8s.io", Resource: "customresourcedefinitions"}
Jobs = schema.GroupResource{Group: "batch", Resource: "jobs"}
Namespaces = schema.GroupResource{Group: "", Resource: "namespaces"}
PersistentVolumeClaims = schema.GroupResource{Group: "", Resource: "persistentvolumeclaims"}
PersistentVolumes = schema.GroupResource{Group: "", Resource: "persistentvolumes"}
Pods = schema.GroupResource{Group: "", Resource: "pods"}
ServiceAccounts = schema.GroupResource{Group: "", Resource: "serviceaccounts"}
)

View File

@@ -284,6 +284,8 @@ func (kr *kubernetesRestorer) Restore(
restoredItems: make(map[velero.ResourceIdentifier]struct{}),
renamedPVs: make(map[string]string),
pvRenamer: kr.pvRenamer,
discoveryHelper: kr.discoveryHelper,
resourcePriorities: kr.resourcePriorities,
}
return restoreCtx.execute()
@@ -377,6 +379,8 @@ type context struct {
restoredItems map[velero.ResourceIdentifier]struct{}
renamedPVs map[string]string
pvRenamer func(string) (string, error)
discoveryHelper discovery.Helper
resourcePriorities []string
}
type resourceClientKey struct {
@@ -456,6 +460,84 @@ func (ctx *context) execute() (Result, Result) {
}
}
// TODO: Re-order this logic so that CRs can be prioritized in the main loop, rather than after.
// Refresh and resolve based on CRDs added to the API server from the above restore loop.
// This is because CRDs have been added to the API groups but until we refresh, Velero doesn't know about the
// newly-added API groups in order to create the CRs from them.
if err := ctx.discoveryHelper.Refresh(); err != nil {
// Don't break on error here, since newResources will be the same as the original prioritizedResources,
// and thus addedResources will end up being empty and we'll restore nothing.
// Since we're continuing the restore, add a warning, not an error.
addVeleroError(&warnings, errors.Wrap(err, "error refreshing discovery API"))
}
newResources, err := prioritizeResources(ctx.discoveryHelper, ctx.resourcePriorities, ctx.resourceIncludesExcludes, ctx.log)
if err != nil {
// If there was an error, then newResources will be nil, so we can continue on the restore.
// addedResources will end up being nil, but we should still report this failure.
addVeleroError(&warnings, errors.Wrap(err, "error sorting resources"))
}
// Filter the resources to only those added since our first restore pass.
addedResources := make([]schema.GroupResource, 0)
for _, r := range newResources {
var found bool
for _, p := range ctx.prioritizedResources {
if r == p {
found = true
break
}
}
// Resource hasn't already been processed, so queue it for the next loop.
if !found {
ctx.log.Debugf("Discovered new resource %s", r)
addedResources = append(addedResources, r)
}
}
// Use the same restore logic as above, but for newly available API groups (CRDs)
for _, resource := range addedResources {
resourceList := backupResources[resource.String()]
if resourceList == nil {
continue
}
for namespace, items := range resourceList.ItemsByNamespace {
if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
ctx.log.Infof("Skipping namespace %s", namespace)
continue
}
// get target namespace to restore into, if different
// from source namespace
targetNamespace := namespace
if target, ok := ctx.restore.Spec.NamespaceMapping[namespace]; ok {
targetNamespace = target
}
// if we don't know whether this namespace exists yet, attempt to create
// it in order to ensure it exists. Try to get it from the backup tarball
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if namespace != "" && !existingNamespaces.Has(targetNamespace) {
logger := ctx.log.WithField("namespace", namespace)
ns := getNamespace(logger, getItemFilePath(ctx.restoreDir, "namespaces", "", namespace), targetNamespace)
if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
addVeleroError(&errs, err)
continue
}
// keep track of namespaces that we know exist so we don't
// have to try to create them multiple times
existingNamespaces.Insert(targetNamespace)
}
w, e := ctx.restoreResource(resource.String(), targetNamespace, namespace, items)
merge(&warnings, &w)
merge(&errs, &e)
}
}
// wait for all of the restic restore goroutines to be done, which is
// only possible once all of their errors have been received by the loop
// below, then close the resticErrs channel so the loop terminates.
@@ -672,6 +754,42 @@ func (ctx *context) shouldRestore(name string, pvClient client.Dynamic) (bool, e
return shouldRestore, err
}
// crdAvailable waits for a CRD to be available for use before letting the restore continue.
func (ctx *context) crdAvailable(name string, crdClient client.Dynamic) (bool, error) {
crdLogger := ctx.log.WithField("crdName", name)
var available bool
// Wait 1 minute rather than the standard resource timeout, since each CRD will transition fairly quickly
err := wait.PollImmediate(time.Second, time.Minute*1, func() (bool, error) {
unstructuredCRD, err := crdClient.Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
// TODO: Due to upstream conversion issues in runtime.FromUnstructured, we use the unstructured object here.
// Once the upstream conversion functions are fixed, we should convert to the CRD types and use IsCRDReady
available, err = kube.IsUnstructuredCRDReady(unstructuredCRD)
if err != nil {
return true, err
}
if !available {
crdLogger.Debug("CRD not yet ready for use")
}
// If the CRD is not available, keep polling (false, nil)
// If the CRD is available, break the poll and return back to caller (true, nil)
return available, nil
})
if err == wait.ErrWaitTimeout {
crdLogger.Debug("timeout reached waiting for custom resource definition to be ready")
}
return available, err
}
// restoreResource restores the specified cluster or namespace scoped resource. If namespace is
// empty we are restoring a cluster level resource, otherwise into the specified namespace.
func (ctx *context) restoreResource(resource, targetNamespace, originalNamespace string, items []string) (Result, Result) {
@@ -1113,6 +1231,17 @@ func (ctx *context) restoreItem(obj *unstructured.Unstructured, groupResource sc
restorePodVolumeBackups(ctx, createdObj, originalNamespace)
}
// Wait for a CRD to be available for instantiating resources
// before continuing.
if groupResource == kuberesource.CustomResourceDefinitions {
available, err := ctx.crdAvailable(name, resourceClient)
if err != nil {
addToResult(&errs, namespace, errors.Wrapf(err, "error verifying custom resource definition is ready to use"))
} else if !available {
addToResult(&errs, namespace, fmt.Errorf("CRD %s is not available to use for custom resources.", name))
}
}
return warnings, errs
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2017 the Velero contributors.
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -22,8 +22,10 @@ import (
"github.com/pkg/errors"
corev1api "k8s.io/api/core/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
@@ -135,3 +137,73 @@ func GetVolumeDirectory(pod *corev1api.Pod, volumeName string, pvcLister corev1l
return pvc.Spec.VolumeName, nil
}
// IsCRDReady checks a CRD to see if it's ready, with both the Established and NamesAccepted conditions.
func IsCRDReady(crd *apiextv1beta1.CustomResourceDefinition) bool {
var isEstablished, namesAccepted bool
for _, cond := range crd.Status.Conditions {
if cond.Type == apiextv1beta1.Established && cond.Status == apiextv1beta1.ConditionTrue {
isEstablished = true
}
if cond.Type == apiextv1beta1.NamesAccepted && cond.Status == apiextv1beta1.ConditionTrue {
namesAccepted = true
}
}
return (isEstablished && namesAccepted)
}
// IsUnstructuredCRDReady checks an unstructured CRD to see if it's ready, with both the Established and NamesAccepted conditions.
// TODO: Delete this function and use IsCRDReady when the upstream runtime.FromUnstructured function properly handles int64 field conversions.
// Duplicated function because the velero install package uses IsCRDReady with the beta types.
// See https://github.com/kubernetes/kubernetes/issues/87675
func IsUnstructuredCRDReady(crd *unstructured.Unstructured) (bool, error) {
var isEstablished, namesAccepted bool
conditions, ok, err := unstructured.NestedSlice(crd.UnstructuredContent(), "status", "conditions")
if !ok {
return false, nil
}
if err != nil {
return false, errors.Wrap(err, "unable to access CRD's conditions")
}
for _, c := range conditions {
// Unlike the typed version of this function, we need to cast the Condition since it's an interface{} here,
// then we fetch the type and status of the Condition before inspecting them for relevant values
cond, ok := c.(map[string]interface{})
if !ok {
return false, errors.New("unable to convert condition to map[string]interface{}")
}
conditionType, ok, err := unstructured.NestedString(cond, "type")
if !ok {
// This should never happen unless someone manually edits the serialized data.
return false, errors.New("condition missing a type")
}
if err != nil {
return false, errors.Wrap(err, "unable to access condition's type")
}
status, ok, err := unstructured.NestedString(cond, "status")
if !ok {
// This should never happen unless someone manually edits the serialized data.
return false, errors.New("condition missing a status")
}
if err != nil {
return false, errors.Wrap(err, "unable to access condition's status")
}
// Here is the actual logic of the function
// Cast the API's types into strings since we're pulling strings out of the unstructured data.
if conditionType == string(apiextv1beta1.Established) && status == string(apiextv1beta1.ConditionTrue) {
isEstablished = true
}
if conditionType == string(apiextv1beta1.NamesAccepted) && status == string(apiextv1beta1.ConditionTrue) {
namesAccepted = true
}
}
return (isEstablished && namesAccepted), nil
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package kube
import (
"encoding/json"
"testing"
"time"
@@ -24,8 +25,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeinformers "k8s.io/client-go/informers"
@@ -197,3 +201,137 @@ func TestGetVolumeDirectorySuccess(t *testing.T) {
assert.Equal(t, tc.want, dir)
}
}
func TestIsCRDReady(t *testing.T) {
tests := []struct {
name string
crd *apiextv1beta1.CustomResourceDefinition
want bool
}{
{
name: "CRD is not established & not accepting names - not ready",
crd: builder.ForCustomResourceDefinition("MyCRD").Result(),
want: false,
},
{
name: "CRD is established & not accepting names - not ready",
crd: builder.ForCustomResourceDefinition("MyCRD").
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()).Result(),
want: false,
},
{
name: "CRD is not established & accepting names - not ready",
crd: builder.ForCustomResourceDefinition("MyCRD").
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()).Result(),
want: false,
},
{
name: "CRD is established & accepting names - ready",
crd: builder.ForCustomResourceDefinition("MyCRD").
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()).
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()).
Result(),
want: true,
},
}
for _, tc := range tests {
result := IsCRDReady(tc.crd)
assert.Equal(t, tc.want, result)
}
}
func TestIsUnstructuredCRDReady(t *testing.T) {
tests := []struct {
name string
crd *apiextv1beta1.CustomResourceDefinition
want bool
}{
{
name: "CRD is not established & not accepting names - not ready",
crd: builder.ForCustomResourceDefinition("MyCRD").Result(),
want: false,
},
{
name: "CRD is established & not accepting names - not ready",
crd: builder.ForCustomResourceDefinition("MyCRD").
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()).Result(),
want: false,
},
{
name: "CRD is not established & accepting names - not ready",
crd: builder.ForCustomResourceDefinition("MyCRD").
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()).Result(),
want: false,
},
{
name: "CRD is established & accepting names - ready",
crd: builder.ForCustomResourceDefinition("MyCRD").
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.Established).Status(apiextv1beta1.ConditionTrue).Result()).
Condition(builder.ForCustomResourceDefinitionCondition().Type(apiextv1beta1.NamesAccepted).Status(apiextv1beta1.ConditionTrue).Result()).
Result(),
want: true,
},
}
for _, tc := range tests {
m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.crd)
require.NoError(t, err)
result, err := IsUnstructuredCRDReady(&unstructured.Unstructured{Object: m})
require.NoError(t, err)
assert.Equal(t, tc.want, result)
}
}
// TestFromUnstructuredIntToFloatBug tests for a bug where runtime.DefaultUnstructuredConverter.FromUnstructured can't take a whole number into a float.
// This test should fail when https://github.com/kubernetes/kubernetes/issues/87675 is fixed upstream, letting us know we can remove the IsUnstructuredCRDReady function.
func TestFromUnstructuredIntToFloatBug(t *testing.T) {
b := []byte(`
{
"apiVersion": "apiextensions.k8s.io/v1beta1",
"kind": "CustomResourceDefinition",
"metadata": {
"name": "foos.example.foo.com"
},
"spec": {
"group": "example.foo.com",
"version": "v1alpha1",
"scope": "Namespaced",
"names": {
"plural": "foos",
"singular": "foo",
"kind": "Foo"
},
"validation": {
"openAPIV3Schema": {
"required": [
"spec"
],
"properties": {
"spec": {
"required": [
"bar"
],
"properties": {
"bar": {
"type": "integer",
"minimum": 1
}
}
}
}
}
}
}
}
`)
var obj unstructured.Unstructured
err := json.Unmarshal(b, &obj)
require.NoError(t, err)
var newCRD apiextv1beta1.CustomResourceDefinition
err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &newCRD)
// If there's no error, then the upstream issue is fixed, and we need to remove our workarounds.
require.Error(t, err)
}