fix issue 4874

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2022-09-07 21:44:45 +08:00
parent 8888f8765e
commit a12024887f
5 changed files with 107 additions and 11 deletions

View File

@@ -85,6 +85,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
)
const (
@@ -112,9 +114,6 @@ const (
// defaultCredentialsDirectory is the path on disk where credential
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
// daemonSet is the name of the Velero restic daemonset.
daemonSet = "restic"
)
type serverConfig struct {
@@ -535,7 +534,7 @@ var defaultRestorePriorities = []string{
func (s *server) initRestic() error {
// warn if restic daemonset does not exist
if _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get(s.ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
if err := nodeagent.IsRunning(s.ctx, s.kubeClient, s.namespace); err == nodeagent.DaemonsetNotFound {
s.logger.Warn("Velero restic daemonset not found; restic backups/restores will not work until it's created")
} else if err != nil {
s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of velero restic daemonset")
@@ -619,7 +618,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
podvolume.NewBackupperFactory(s.repoLocker, s.repoEnsurer, s.veleroClient, s.kubeClient.CoreV1(),
s.kubeClient.CoreV1(), s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.kubeClient.CoreV1(), s.kubeClient.CoreV1(),
s.sharedInformerFactory.Velero().V1().BackupRepositories().Informer().HasSynced, s.logger),
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToRestic,
s.config.clientPageSize,

View File

@@ -0,0 +1,69 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeagent
import (
"context"
"fmt"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
// daemonSet is the name of the Velero restic daemonset.
daemonSet = "restic"
)
var (
DaemonsetNotFound = errors.New("daemonset not found")
)
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found
func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error {
if _, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
return DaemonsetNotFound
} else if err != nil {
return err
} else {
return nil
}
}
// IsRunningInNode checks if the node agent daemonset pod is running properly in a specified node. If not, return the error found
func IsRunningInNode(ctx context.Context, namespace string, nodeName string, podClient corev1client.PodsGetter) error {
if nodeName == "" {
return errors.New("node name is empty")
}
pods, err := podClient.Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s", daemonSet)})
if err != nil {
return errors.Wrap(err, "failed to list daemonset pods")
}
for _, pod := range pods.Items {
if pod.Spec.NodeName == nodeName {
return nil
}
}
return errors.Errorf("daemonset pod not found in node %s", nodeName)
}

View File

@@ -32,6 +32,7 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)
@@ -49,6 +50,7 @@ type backupper struct {
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
podClient corev1client.PodsGetter
uploaderType string
results map[string]chan *velerov1api.PodVolumeBackup
@@ -63,6 +65,7 @@ func newBackupper(
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
podClient corev1client.PodsGetter,
uploaderType string,
log logrus.FieldLogger,
) *backupper {
@@ -73,6 +76,7 @@ func newBackupper(
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
podClient: podClient,
uploaderType: uploaderType,
results: make(map[string]chan *velerov1api.PodVolumeBackup),
@@ -121,6 +125,16 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
return nil, []error{err}
}
err = IsPodQualified(pod)
if err != nil {
return nil, []error{err}
}
err = nodeagent.IsRunningInNode(b.ctx, backup.Namespace, pod.Spec.NodeName, b.podClient)
if err != nil {
return nil, []error{err}
}
// get a single non-exclusive lock since we'll wait for all individual
// backups to be complete before releasing it.
b.repoLocker.Lock(repo.Name)
@@ -167,11 +181,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
}
}
// ignore non-running pods
if pod.Status.Phase != corev1api.PodRunning {
log.Warnf("Skipping volume %s in pod %s/%s - pod not running", volumeName, pod.Namespace, pod.Name)
continue
}
// hostPath volumes are not supported because they're not mounted into /var/lib/kubelet/pods, so our
// daemonset pod has no way to access their data.
isHostPath, err := isHostPathVolume(&volume, pvc, b.pvClient.PersistentVolumes())

View File

@@ -43,6 +43,7 @@ func NewBackupperFactory(repoLocker *repository.RepoLocker,
veleroClient clientset.Interface,
pvcClient corev1client.PersistentVolumeClaimsGetter,
pvClient corev1client.PersistentVolumesGetter,
podClient corev1client.PodsGetter,
repoInformerSynced cache.InformerSynced,
log logrus.FieldLogger) BackupperFactory {
return &backupperFactory{
@@ -51,6 +52,7 @@ func NewBackupperFactory(repoLocker *repository.RepoLocker,
veleroClient: veleroClient,
pvcClient: pvcClient,
pvClient: pvClient,
podClient: podClient,
repoInformerSynced: repoInformerSynced,
log: log,
}
@@ -62,6 +64,7 @@ type backupperFactory struct {
veleroClient clientset.Interface
pvcClient corev1client.PersistentVolumeClaimsGetter
pvClient corev1client.PersistentVolumesGetter
podClient corev1client.PodsGetter
repoInformerSynced cache.InformerSynced
log logrus.FieldLogger
}
@@ -77,7 +80,7 @@ func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1ap
},
)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, uploaderType, bf.log)
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, informer, bf.veleroClient, bf.pvcClient, bf.pvClient, bf.podClient, uploaderType, bf.log)
go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced, bf.repoInformerSynced) {

View File

@@ -22,6 +22,7 @@ import (
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/pkg/errors"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
@@ -295,3 +296,17 @@ func GetPodVolumesUsingRestic(pod *corev1api.Pod, defaultVolumesToRestic bool) [
}
return podVolumes
}
// IsPodQualified checks if the pod's status is qualified for a PVB/PVR to backup/restore its volumes.
// If no, return the error found
func IsPodQualified(pod *corev1api.Pod) error {
if pod.Spec.NodeName == "" {
return errors.Errorf("pod is not scheduled, name=%s, namespace=%s, status=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
if pod.Status.Phase != corev1api.PodRunning {
return errors.Errorf("pod is not running, name=%s, namespace=%s, status=%s", pod.Name, pod.Namespace, pod.Status.Phase)
}
return nil
}