mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-04 20:24:02 +00:00
* Use appropriate CRD API during readiness check (#4015) * Use appropriate CRD API during readiness check The readiness check for the Velero CRDs was still using the v1beta1 API. This would cause the readiness check to fail on 1.22 clusters as the v1beta1 API is no longer available. Previously, this error would be ignored and the installation would proceed, however with #4002, we are no longer ignoring errors from this check. This change modifies the CRD readiness check to check the CRDs using the same API version that was used when submitting the CRDs to the cluster. It also introduces a new CRD builder using the V1 API for testing. This change also fixes a bug that was identified in the polling code where if the CRDs were not ready on the first polling iteration, they would be added again to the list of CRDs to check resulting in duplicates. This would cause the length check to fail on all subsequent polls and the timeout would always be reached. Signed-off-by: Bridget McErlean <bmcerlean@vmware.com> * Remove duplicate V1 CRD builder and update comment Signed-off-by: Bridget McErlean <bmcerlean@vmware.com> * Merge pull request #4012 from jenting/add-k8s-1.22-ci-test Add Kubernetes v1.22 CI test * Update changelog for v1.6.3 Signed-off-by: Bridget McErlean <bmcerlean@vmware.com> Co-authored-by: Scott Seago <sseago@redhat.com>
358 lines
12 KiB
Go
358 lines
12 KiB
Go
/*
|
|
Copyright the Velero contributors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package install
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
|
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
"github.com/vmware-tanzu/velero/pkg/client"
|
|
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
|
)
|
|
|
|
// kindToResource translates a Kind (mixed case, singular) to a Resource (lowercase, plural) string.
|
|
// This is to accommodate the dynamic client's need for an APIResource, as the Unstructured objects do not have easy helpers for this information.
|
|
var kindToResource = map[string]string{
|
|
"CustomResourceDefinition": "customresourcedefinitions",
|
|
"Namespace": "namespaces",
|
|
"ClusterRoleBinding": "clusterrolebindings",
|
|
"ServiceAccount": "serviceaccounts",
|
|
"Deployment": "deployments",
|
|
"DaemonSet": "daemonsets",
|
|
"Secret": "secrets",
|
|
"BackupStorageLocation": "backupstoragelocations",
|
|
"VolumeSnapshotLocation": "volumesnapshotlocations",
|
|
}
|
|
|
|
// ResourceGroup represents a collection of kubernetes objects with a common ready condition
|
|
type ResourceGroup struct {
|
|
CRDResources []*unstructured.Unstructured
|
|
OtherResources []*unstructured.Unstructured
|
|
}
|
|
|
|
// crdV1Beta1ReadinessFn returns a function that can be used for polling to check
|
|
// if the provided unstructured v1beta1 CRDs are ready for use in the cluster.
|
|
func crdV1Beta1ReadinessFn(kbClient kbclient.Client, unstructuredCrds []*unstructured.Unstructured) func() (bool, error) {
|
|
// Track all the CRDs that have been found and in ready state.
|
|
// len should be equal to len(unstructuredCrds) in the happy path.
|
|
return func() (bool, error) {
|
|
foundCRDs := make([]*apiextv1beta1.CustomResourceDefinition, 0)
|
|
for _, unstructuredCrd := range unstructuredCrds {
|
|
crd := &apiextv1beta1.CustomResourceDefinition{}
|
|
key := kbclient.ObjectKey{Name: unstructuredCrd.GetName()}
|
|
err := kbClient.Get(context.Background(), key, crd)
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
} else if err != nil {
|
|
return false, errors.Wrapf(err, "error waiting for %s to be ready", crd.GetName())
|
|
}
|
|
foundCRDs = append(foundCRDs, crd)
|
|
}
|
|
|
|
if len(foundCRDs) != len(unstructuredCrds) {
|
|
return false, nil
|
|
}
|
|
|
|
for _, crd := range foundCRDs {
|
|
ready := kube.IsV1Beta1CRDReady(crd)
|
|
if !ready {
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
// crdV1ReadinessFn returns a function that can be used for polling to check
|
|
// if the provided unstructured v1 CRDs are ready for use in the cluster.
|
|
func crdV1ReadinessFn(kbClient kbclient.Client, unstructuredCrds []*unstructured.Unstructured) func() (bool, error) {
|
|
return func() (bool, error) {
|
|
foundCRDs := make([]*apiextv1.CustomResourceDefinition, 0)
|
|
for _, unstructuredCrd := range unstructuredCrds {
|
|
crd := &apiextv1.CustomResourceDefinition{}
|
|
key := kbclient.ObjectKey{Name: unstructuredCrd.GetName()}
|
|
err := kbClient.Get(context.Background(), key, crd)
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
} else if err != nil {
|
|
return false, errors.Wrapf(err, "error waiting for %s to be ready", crd.GetName())
|
|
}
|
|
foundCRDs = append(foundCRDs, crd)
|
|
}
|
|
|
|
if len(foundCRDs) != len(unstructuredCrds) {
|
|
return false, nil
|
|
}
|
|
|
|
for _, crd := range foundCRDs {
|
|
ready := kube.IsV1CRDReady(crd)
|
|
if !ready {
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
// crdsAreReady polls the API server to see if the Velero CRDs are ready to create objects.
|
|
func crdsAreReady(kbClient kbclient.Client, crds []*unstructured.Unstructured) (bool, error) {
|
|
if len(crds) == 0 {
|
|
// no CRDs to check so return
|
|
return true, nil
|
|
}
|
|
|
|
// We assume that all Velero CRDs have the same GVK so we can use the GVK of the
|
|
// first CRD to determine whether to use the v1beta1 or v1 API during polling.
|
|
gvk := crds[0].GroupVersionKind()
|
|
|
|
var crdReadinessFn func() (bool, error)
|
|
if gvk.Version == "v1beta1" {
|
|
crdReadinessFn = crdV1Beta1ReadinessFn(kbClient, crds)
|
|
} else if gvk.Version == "v1" {
|
|
crdReadinessFn = crdV1ReadinessFn(kbClient, crds)
|
|
} else {
|
|
return false, fmt.Errorf("unsupported CRD version %q", gvk.Version)
|
|
}
|
|
|
|
err := wait.PollImmediate(time.Second, time.Minute, crdReadinessFn)
|
|
if err != nil {
|
|
return false, errors.Wrap(err, "Error polling for CRDs")
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func isAvailable(c appsv1.DeploymentCondition) bool {
|
|
// Make sure that the deployment has been available for at least 10 seconds.
|
|
// This is because the deployment can show as Ready momentarily before the pods fall into a CrashLoopBackOff.
|
|
// See podutils.IsPodAvailable upstream for similar logic with pods
|
|
if c.Type == appsv1.DeploymentAvailable && c.Status == corev1.ConditionTrue {
|
|
if !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(10*time.Second).Before(time.Now()) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// DeploymentIsReady will poll the kubernetes API server to see if the velero deployment is ready to service user requests.
|
|
func DeploymentIsReady(factory client.DynamicFactory, namespace string) (bool, error) {
|
|
gvk := schema.FromAPIVersionAndKind(appsv1.SchemeGroupVersion.String(), "Deployment")
|
|
apiResource := metav1.APIResource{
|
|
Name: "deployments",
|
|
Namespaced: true,
|
|
}
|
|
c, err := factory.ClientForGroupVersionResource(gvk.GroupVersion(), apiResource, namespace)
|
|
if err != nil {
|
|
return false, errors.Wrapf(err, "Error creating client for deployment polling")
|
|
}
|
|
// declare this variable out of scope so we can return it
|
|
var isReady bool
|
|
var readyObservations int32
|
|
err = wait.PollImmediate(time.Second, 3*time.Minute, func() (bool, error) {
|
|
unstructuredDeployment, err := c.Get("velero", metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
} else if err != nil {
|
|
return false, errors.Wrap(err, "error waiting for deployment to be ready")
|
|
}
|
|
|
|
deploy := new(appsv1.Deployment)
|
|
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredDeployment.Object, deploy); err != nil {
|
|
return false, errors.Wrap(err, "error converting deployment from unstructured")
|
|
}
|
|
|
|
for _, cond := range deploy.Status.Conditions {
|
|
if isAvailable(cond) {
|
|
readyObservations++
|
|
}
|
|
}
|
|
// Make sure we query the deployment enough times to see the state change, provided there is one.
|
|
if readyObservations > 4 {
|
|
isReady = true
|
|
return true, nil
|
|
} else {
|
|
return false, nil
|
|
}
|
|
})
|
|
return isReady, err
|
|
}
|
|
|
|
// DaemonSetIsReady will poll the kubernetes API server to ensure the restic daemonset is ready, i.e. that
|
|
// pods are scheduled and available on all of the the desired nodes.
|
|
func DaemonSetIsReady(factory client.DynamicFactory, namespace string) (bool, error) {
|
|
gvk := schema.FromAPIVersionAndKind(appsv1.SchemeGroupVersion.String(), "DaemonSet")
|
|
apiResource := metav1.APIResource{
|
|
Name: "daemonsets",
|
|
Namespaced: true,
|
|
}
|
|
|
|
c, err := factory.ClientForGroupVersionResource(gvk.GroupVersion(), apiResource, namespace)
|
|
if err != nil {
|
|
return false, errors.Wrapf(err, "Error creating client for daemonset polling")
|
|
}
|
|
|
|
// declare this variable out of scope so we can return it
|
|
var isReady bool
|
|
var readyObservations int32
|
|
|
|
err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
|
|
unstructuredDaemonSet, err := c.Get("restic", metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
} else if err != nil {
|
|
return false, errors.Wrap(err, "error waiting for daemonset to be ready")
|
|
}
|
|
|
|
daemonSet := new(appsv1.DaemonSet)
|
|
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredDaemonSet.Object, daemonSet); err != nil {
|
|
return false, errors.Wrap(err, "error converting daemonset from unstructured")
|
|
}
|
|
|
|
if daemonSet.Status.NumberAvailable == daemonSet.Status.DesiredNumberScheduled {
|
|
readyObservations++
|
|
}
|
|
|
|
// Wait for 5 observations of the daemonset being "ready" to be consistent with our check for
|
|
// the deployment being ready.
|
|
if readyObservations > 4 {
|
|
isReady = true
|
|
return true, nil
|
|
} else {
|
|
return false, nil
|
|
}
|
|
})
|
|
return isReady, err
|
|
}
|
|
|
|
// GroupResources groups resources based on whether the resources are CustomResourceDefinitions or other types of kubernetes objects
|
|
// This is useful to wait for readiness before creating CRD objects
|
|
func GroupResources(resources *unstructured.UnstructuredList) *ResourceGroup {
|
|
rg := new(ResourceGroup)
|
|
|
|
for i, r := range resources.Items {
|
|
if r.GetKind() == "CustomResourceDefinition" {
|
|
rg.CRDResources = append(rg.CRDResources, &resources.Items[i])
|
|
continue
|
|
}
|
|
rg.OtherResources = append(rg.OtherResources, &resources.Items[i])
|
|
}
|
|
|
|
return rg
|
|
}
|
|
|
|
// createResource attempts to create a resource in the cluster.
|
|
// If the resource already exists in the cluster, it's merely logged.
|
|
func createResource(r *unstructured.Unstructured, factory client.DynamicFactory, w io.Writer) error {
|
|
id := fmt.Sprintf("%s/%s", r.GetKind(), r.GetName())
|
|
|
|
// Helper to reduce boilerplate message about the same object
|
|
log := func(f string, a ...interface{}) {
|
|
format := strings.Join([]string{id, ": ", f, "\n"}, "")
|
|
fmt.Fprintf(w, format, a...)
|
|
}
|
|
log("attempting to create resource")
|
|
|
|
c, err := CreateClient(r, factory, w)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := c.Create(r); apierrors.IsAlreadyExists(err) {
|
|
log("already exists, proceeding")
|
|
} else if err != nil {
|
|
return errors.Wrapf(err, "Error creating resource %s", id)
|
|
}
|
|
|
|
log("created")
|
|
return nil
|
|
}
|
|
|
|
// CreateClient creates a client for an unstructured resource
|
|
func CreateClient(r *unstructured.Unstructured, factory client.DynamicFactory, w io.Writer) (client.Dynamic, error) {
|
|
id := fmt.Sprintf("%s/%s", r.GetKind(), r.GetName())
|
|
|
|
// Helper to reduce boilerplate message about the same object
|
|
log := func(f string, a ...interface{}) {
|
|
format := strings.Join([]string{id, ": ", f, "\n"}, "")
|
|
fmt.Fprintf(w, format, a...)
|
|
}
|
|
log("attempting to create resource client")
|
|
|
|
gvk := schema.FromAPIVersionAndKind(r.GetAPIVersion(), r.GetKind())
|
|
|
|
apiResource := metav1.APIResource{
|
|
Name: kindToResource[r.GetKind()],
|
|
Namespaced: (r.GetNamespace() != ""),
|
|
}
|
|
|
|
c, err := factory.ClientForGroupVersionResource(gvk.GroupVersion(), apiResource, r.GetNamespace())
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "Error creating client for resource %s", id)
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// Install creates resources on the Kubernetes cluster.
|
|
// An unstructured list of resources is sent, one at a time, to the server. These are assumed to be in the preferred order already.
|
|
// Resources will be sorted into CustomResourceDefinitions and any other resource type, and the function will wait up to 1 minute
|
|
// for CRDs to be ready before proceeding.
|
|
// An io.Writer can be used to output to a log or the console.
|
|
func Install(dynamicFactory client.DynamicFactory, kbClient kbclient.Client, resources *unstructured.UnstructuredList, w io.Writer) error {
|
|
rg := GroupResources(resources)
|
|
|
|
//Install CRDs first
|
|
for _, r := range rg.CRDResources {
|
|
if err := createResource(r, dynamicFactory, w); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Wait for CRDs to be ready before proceeding
|
|
fmt.Fprint(w, "Waiting for resources to be ready in cluster...\n")
|
|
_, err := crdsAreReady(kbClient, rg.CRDResources)
|
|
if err == wait.ErrWaitTimeout {
|
|
return errors.Errorf("timeout reached, CRDs not ready")
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Install all other resources
|
|
for _, r := range rg.OtherResources {
|
|
if err = createResource(r, dynamicFactory, w); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|