diff --git a/changelogs/unreleased/8985-Lyndon-Li b/changelogs/unreleased/8985-Lyndon-Li new file mode 100644 index 000000000..7bb89251c --- /dev/null +++ b/changelogs/unreleased/8985-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #8960, implement PodVolume exposer for PVB/PVR \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 5bfec7c87..ede158116 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -76,8 +76,6 @@ const ( // the port where prometheus metrics are exposed defaultMetricsAddress = ":8085" - defaultHostPodsPath = "/host_pods" - defaultResourceTimeout = 10 * time.Minute defaultDataMoverPrepareTimeout = 30 * time.Minute defaultDataPathConcurrentNum = 1 @@ -301,14 +299,14 @@ func (s *nodeAgentServer) run() { credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, + pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger) if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerPodVolumeBackup) } - if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { + if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } @@ -412,10 +410,10 @@ func (s *nodeAgentServer) waitCacheForResume() error { // validatePodVolumesHostPath validates that the pod volumes path contains a // directory for each Pod running on this node func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error { - files, err := s.fileSystem.ReadDir(defaultHostPodsPath) + files, err := s.fileSystem.ReadDir(nodeagent.HostPodVolumeMountPath()) if err != nil { if errors.Is(err, os.ErrNotExist) { - s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", defaultHostPodsPath) + s.logger.Warnf("Pod volumes host path [%s] doesn't exist, fs-backup is disabled", nodeagent.HostPodVolumeMountPath()) return nil } return errors.Wrap(err, "could not read pod volumes host path") @@ -448,7 +446,7 @@ func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface valid = false s.logger.WithFields(logrus.Fields{ "pod": fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()), - "path": defaultHostPodsPath + "/" + dirName, + "path": nodeagent.HostPodVolumeMountPath() + "/" + dirName, }).Debug("could not find volumes for pod in host path") } } diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index 3d0290a70..2551bfac3 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -99,7 +99,7 @@ func Test_validatePodVolumesHostPath(t *testing.T) { for _, dir := range tt.dirs { if tt.createDir { - err := fs.MkdirAll(filepath.Join(defaultHostPodsPath, dir), os.ModePerm) + err := fs.MkdirAll(filepath.Join(nodeagent.HostPodVolumeMountPath(), dir), os.ModePerm) if err != nil { t.Error(err) } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 03cb3fe8e..254a39c91 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -48,10 +49,11 @@ import ( const pVBRRequestor string = "pod-volume-backup-restore" // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance -func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, +func NewPodVolumeBackupReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { return &PodVolumeBackupReconciler{ Client: client, + kubeClient: kubeClient, logger: logger.WithField("controller", "PodVolumeBackup"), repositoryEnsurer: ensurer, credentialGetter: credentialGetter, @@ -67,6 +69,7 @@ func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Ma // PodVolumeBackupReconciler reconciles a PodVolumeBackup object type PodVolumeBackupReconciler struct { client.Client + kubeClient kubernetes.Interface scheme *runtime.Scheme clock clocks.WithTickerAndDelayedExecution metrics *metrics.ServerMetrics @@ -155,7 +158,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ return r.errorOut(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log) } - path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.Client, r.fileSystem, log) + path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.kubeClient, r.fileSystem, log) if err != nil { r.closeDataPath(ctx, pvb.Name) return r.errorOut(ctx, &pvb, err, "error exposing host path for pod volume", log) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index f4645657f..e65d1b606 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,10 +50,11 @@ import ( "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) -func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, +func NewPodVolumeRestoreReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ Client: client, + kubeClient: kubeClient, logger: logger.WithField("controller", "PodVolumeRestore"), repositoryEnsurer: ensurer, credentialGetter: credentialGetter, @@ -64,6 +66,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.M type PodVolumeRestoreReconciler struct { client.Client + kubeClient kubernetes.Interface logger logrus.FieldLogger repositoryEnsurer *repository.Ensurer credentialGetter *credentials.CredentialGetter @@ -135,7 +138,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req return c.errorOut(ctx, pvr, err, "error to update status to in progress", log) } - volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.Client, c.fileSystem, log) + volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.kubeClient, c.fileSystem, log) if err != nil { c.closeDataPath(ctx, pvr.Name) return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log) diff --git a/pkg/exposer/host_path.go b/pkg/exposer/host_path.go index d249dda39..e51178711 100644 --- a/pkg/exposer/host_path.go +++ b/pkg/exposer/host_path.go @@ -19,13 +19,15 @@ package exposer import ( "context" "fmt" + "strings" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/client-go/kubernetes" "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" @@ -37,17 +39,17 @@ var singlePathMatch = kube.SinglePathMatch // GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName string, - cli ctrlclient.Client, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) { + kubeClient kubernetes.Interface, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) { logger := log.WithField("pod name", pod.Name).WithField("pod UID", pod.GetUID()).WithField("volume", volumeName) - volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, cli) + volDir, err := getVolumeDirectory(ctx, logger, pod, volumeName, kubeClient) if err != nil { return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume directory name for volume %s in pod %s", volumeName, pod.Name) } logger.WithField("volDir", volDir).Info("Got volume dir") - volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli) + volMode, err := getVolumeMode(ctx, logger, pod, volumeName, kubeClient) if err != nil { return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name) } @@ -57,7 +59,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st volSubDir = "volumeDevices" } - pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir) + pathGlob := fmt.Sprintf("%s/%s/%s/*/%s", nodeagent.HostPodVolumeMountPath(), string(pod.GetUID()), volSubDir, volDir) logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") path, err := singlePathMatch(pathGlob, fs, logger) @@ -72,3 +74,22 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1api.Pod, volumeName st VolMode: volMode, }, nil } + +var getHostPodPath = nodeagent.GetHostPodPath + +func ExtractPodVolumeHostPath(ctx context.Context, path string, kubeClient kubernetes.Interface, veleroNamespace string, osType string) (string, error) { + podPath, err := getHostPodPath(ctx, kubeClient, veleroNamespace, osType) + if err != nil { + return "", errors.Wrap(err, "error getting host pod path from node-agent") + } + + if osType == kube.NodeOSWindows { + podPath = strings.Replace(podPath, "/", "\\", -1) + } + + if osType == kube.NodeOSWindows { + return strings.Replace(path, nodeagent.HostPodVolumeMountPathWin(), podPath, 1), nil + } else { + return strings.Replace(path, nodeagent.HostPodVolumeMountPath(), podPath, 1), nil + } +} diff --git a/pkg/exposer/host_path_test.go b/pkg/exposer/host_path_test.go index 9faff5f14..411833f64 100644 --- a/pkg/exposer/host_path_test.go +++ b/pkg/exposer/host_path_test.go @@ -18,26 +18,29 @@ package exposer import ( "context" + "fmt" "testing" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" corev1api "k8s.io/api/core/v1" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/client-go/kubernetes" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/nodeagent" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" ) func TestGetPodVolumeHostPath(t *testing.T) { tests := []struct { name string - getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) - getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) + getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) + getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error) pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error) pod *corev1api.Pod pvc string @@ -45,7 +48,7 @@ func TestGetPodVolumeHostPath(t *testing.T) { }{ { name: "get volume dir fail", - getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) { + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) { return "", errors.New("fake-error-1") }, pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(), @@ -54,10 +57,10 @@ func TestGetPodVolumeHostPath(t *testing.T) { }, { name: "single path match fail", - getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (string, error) { + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (string, error) { return "", nil }, - getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) { + getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) (uploader.PersistentVolumeMode, error) { return uploader.PersistentVolumeFilesystem, nil }, pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) { @@ -69,7 +72,7 @@ func TestGetPodVolumeHostPath(t *testing.T) { }, { name: "get block volume dir success", - getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, ctrlclient.Client) ( + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1api.Pod, string, kubernetes.Interface) ( string, error) { return "fake-pvc-1", nil }, @@ -102,3 +105,58 @@ func TestGetPodVolumeHostPath(t *testing.T) { }) } } + +func TestExtractPodVolumeHostPath(t *testing.T) { + tests := []struct { + name string + getHostPodPathFunc func(context.Context, kubernetes.Interface, string, string) (string, error) + path string + osType string + expectedErr string + expected string + }{ + { + name: "get host pod path error", + getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) { + return "", errors.New("fake-error-1") + }, + + expectedErr: "error getting host pod path from node-agent: fake-error-1", + }, + { + name: "Windows os", + getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods", nil + }, + path: fmt.Sprintf("\\%s\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount", nodeagent.HostPodVolumeMountPoint), + osType: kube.NodeOSWindows, + expected: "\\var\\lib\\kubelet\\pods\\pod-id-xxx\\volumes\\kubernetes.io~csi\\pvc-id-xxx\\mount", + }, + { + name: "linux OS", + getHostPodPathFunc: func(context.Context, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods", nil + }, + path: fmt.Sprintf("/%s/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nodeagent.HostPodVolumeMountPoint), + osType: kube.NodeOSLinux, + expected: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.getHostPodPathFunc != nil { + getHostPodPath = test.getHostPodPathFunc + } + + path, err := ExtractPodVolumeHostPath(context.Background(), test.path, nil, "", test.osType) + + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, path) + } + }) + } +} diff --git a/pkg/exposer/pod_volume.go b/pkg/exposer/pod_volume.go new file mode 100644 index 000000000..b402ffecb --- /dev/null +++ b/pkg/exposer/pod_volume.go @@ -0,0 +1,392 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1api "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/pkg/nodeagent" + "github.com/vmware-tanzu/velero/pkg/util/boolptr" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +const ( + PodVolumeExposeTypeBackup = "pod-volume-backup" + PodVolumeExposeTypeRestore = "pod-volume-restore" +) + +// PodVolumeExposeParam define the input param for pod volume Expose +type PodVolumeExposeParam struct { + // ClientPodName is the name of pod to be backed up or restored + ClientPodName string + + // ClientNamespace is the namespace to be backed up or restored + ClientNamespace string + + // ClientNamespace is the pod volume for the client PVC + ClientPodVolume string + + // HostingPodLabels is the labels that are going to apply to the hosting pod + HostingPodLabels map[string]string + + // HostingPodAnnotations is the annotations that are going to apply to the hosting pod + HostingPodAnnotations map[string]string + + // Resources defines the resource requirements of the hosting pod + Resources corev1api.ResourceRequirements + + // OperationTimeout specifies the time wait for resources operations in Expose + OperationTimeout time.Duration + + // Type specifies the type of the expose, either backup or erstore + Type string +} + +// PodVolumeExposer is the interfaces for a pod volume exposer +type PodVolumeExposer interface { + // Expose starts the process to a pod volume expose, the expose process may take long time + Expose(context.Context, corev1api.ObjectReference, PodVolumeExposeParam) error + + // GetExposed polls the status of the expose. + // If the expose is accessible by the current caller, it waits the expose ready and returns the expose result. + // Otherwise, it returns nil as the expose result without an error. + GetExposed(context.Context, corev1api.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error) + + // PeekExposed tests the status of the expose. + // If the expose is incomplete but not recoverable, it returns an error. + // Otherwise, it returns nil immediately. + PeekExposed(context.Context, corev1api.ObjectReference) error + + // DiagnoseExpose generate the diagnostic info when the expose is not finished for a long time. + // If it finds any problem, it returns an string about the problem. + DiagnoseExpose(context.Context, corev1api.ObjectReference) string + + // CleanUp cleans up any objects generated during the restore expose + CleanUp(context.Context, corev1api.ObjectReference) +} + +// NewPodVolumeExposer creates a new instance of pod volume exposer +func NewPodVolumeExposer(kubeClient kubernetes.Interface, log logrus.FieldLogger) PodVolumeExposer { + return &podVolumeExposer{ + kubeClient: kubeClient, + fs: filesystem.NewFileSystem(), + log: log, + } +} + +type podVolumeExposer struct { + kubeClient kubernetes.Interface + fs filesystem.Interface + log logrus.FieldLogger +} + +var getPodVolumeHostPath = GetPodVolumeHostPath +var extractPodVolumeHostPath = ExtractPodVolumeHostPath + +func (e *podVolumeExposer) Expose(ctx context.Context, ownerObject corev1api.ObjectReference, param PodVolumeExposeParam) error { + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "client pod": param.ClientPodName, + "client pod volume": param.ClientPodVolume, + "client namespace": param.ClientNamespace, + "type": param.Type, + }) + + pod, err := e.kubeClient.CoreV1().Pods(param.ClientNamespace).Get(ctx, param.ClientPodName, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "error getting client pod %s", param.ClientPodName) + } + + if pod.Spec.NodeName == "" { + return errors.Errorf("client pod %s doesn't have a node name", pod.Name) + } + + nodeOS, err := kube.GetNodeOS(ctx, pod.Spec.NodeName, e.kubeClient.CoreV1()) + if err != nil { + return errors.Wrapf(err, "error getting OS for node %s", pod.Spec.NodeName) + } + + curLog.Infof("Client pod is running in node %s, os %s", pod.Spec.NodeName, nodeOS) + + path, err := getPodVolumeHostPath(ctx, pod, param.ClientPodVolume, e.kubeClient, e.fs, e.log) + if err != nil { + return errors.Wrapf(err, "error to get pod volume path") + } + + path.ByPath, err = extractPodVolumeHostPath(ctx, path.ByPath, e.kubeClient, ownerObject.Namespace, nodeOS) + if err != nil { + return errors.Wrapf(err, "error to extract pod volume path") + } + + curLog.WithField("path", path).Infof("Host path is retrieved for pod %s, volume %s", param.ClientPodName, param.ClientPodVolume) + + hostingPod, err := e.createHostingPod(ctx, ownerObject, param.Type, path.ByPath, param.OperationTimeout, param.HostingPodLabels, param.HostingPodAnnotations, pod.Spec.NodeName, param.Resources, nodeOS) + if err != nil { + return errors.Wrapf(err, "error to create hosting pod") + } + + curLog.WithField("pod name", hostingPod.Name).Info("Hosting pod is created") + + return nil +} + +func (e *podVolumeExposer) GetExposed(ctx context.Context, ownerObject corev1api.ObjectReference, nodeClient client.Client, nodeName string, timeout time.Duration) (*ExposeResult, error) { + hostingPodName := ownerObject.Name + + containerName := string(ownerObject.UID) + volumeName := string(ownerObject.UID) + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + "node": nodeName, + }) + + var updated *corev1api.Pod + err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { + pod := &corev1api.Pod{} + err := nodeClient.Get(ctx, types.NamespacedName{ + Namespace: ownerObject.Namespace, + Name: hostingPodName, + }, pod) + + if err != nil { + return false, errors.Wrapf(err, "error to get pod %s/%s", ownerObject.Namespace, hostingPodName) + } + + if pod.Status.Phase != corev1api.PodRunning { + return false, nil + } + + updated = pod + + return true, nil + }) + + if err != nil { + if apierrors.IsNotFound(err) { + curLog.WithField("hosting pod", hostingPodName).Debug("Hosting pod is not running in the current node") + return nil, nil + } else { + return nil, errors.Wrapf(err, "error to wait for rediness of pod %s", hostingPodName) + } + } + + curLog.WithField("pod", updated.Name).Infof("Hosting pod is in running state in node %s", updated.Spec.NodeName) + + return &ExposeResult{ByPod: ExposeByPod{ + HostingPod: updated, + HostingContainer: containerName, + VolumeName: volumeName, + }}, nil +} + +func (e *podVolumeExposer) PeekExposed(ctx context.Context, ownerObject corev1api.ObjectReference) error { + hostingPodName := ownerObject.Name + + curLog := e.log.WithFields(logrus.Fields{ + "owner": ownerObject.Name, + }) + + pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + + if err != nil { + curLog.WithError(err).Warnf("error to peek hosting pod %s", hostingPodName) + return nil + } + + if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed { + return errors.New(message) + } + + return nil +} + +func (e *podVolumeExposer) DiagnoseExpose(ctx context.Context, ownerObject corev1api.ObjectReference) string { + hostingPodName := ownerObject.Name + + diag := "begin diagnose pod volume exposer\n" + + pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, hostingPodName, metav1.GetOptions{}) + if err != nil { + pod = nil + diag += fmt.Sprintf("error getting hosting pod %s, err: %v\n", hostingPodName, err) + } + + if pod != nil { + diag += kube.DiagnosePod(pod) + + if pod.Spec.NodeName != "" { + if err := nodeagent.KbClientIsRunningInNode(ctx, ownerObject.Namespace, pod.Spec.NodeName, e.kubeClient); err != nil { + diag += fmt.Sprintf("node-agent is not running in node %s, err: %v\n", pod.Spec.NodeName, err) + } + } + } + + diag += "end diagnose pod volume exposer" + + return diag +} + +func (e *podVolumeExposer) CleanUp(ctx context.Context, ownerObject corev1api.ObjectReference) { + restorePodName := ownerObject.Name + kube.DeletePodIfAny(ctx, e.kubeClient.CoreV1(), restorePodName, ownerObject.Namespace, e.log) +} + +func (e *podVolumeExposer) createHostingPod(ctx context.Context, ownerObject corev1api.ObjectReference, exposeType string, hostPath string, + operationTimeout time.Duration, label map[string]string, annotation map[string]string, selectedNode string, resources corev1api.ResourceRequirements, nodeOS string) (*corev1api.Pod, error) { + hostingPodName := ownerObject.Name + + containerName := string(ownerObject.UID) + clientVolumeName := string(ownerObject.UID) + clientVolumePath := "/" + clientVolumeName + + podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS) + if err != nil { + return nil, errors.Wrap(err, "error to get inherited pod info from node-agent") + } + + var gracePeriod int64 + mountPropagation := corev1api.MountPropagationHostToContainer + volumeMounts := []corev1api.VolumeMount{{ + Name: clientVolumeName, + MountPath: clientVolumePath, + MountPropagation: &mountPropagation, + }} + volumeMounts = append(volumeMounts, podInfo.volumeMounts...) + + volumes := []corev1api.Volume{{ + Name: clientVolumeName, + VolumeSource: corev1api.VolumeSource{ + HostPath: &corev1api.HostPathVolumeSource{ + Path: hostPath, + }, + }, + }} + volumes = append(volumes, podInfo.volumes...) + + args := []string{ + fmt.Sprintf("--volume-path=%s", clientVolumePath), + fmt.Sprintf("--resource-timeout=%s", operationTimeout.String()), + } + + command := []string{ + "/velero", + "pod-volume", + } + + if exposeType == PodVolumeExposeTypeBackup { + args = append(args, fmt.Sprintf("--pod-volume-backup=%s", ownerObject.Name)) + command = append(command, "backup") + } else { + args = append(args, fmt.Sprintf("--pod-volume-restore=%s", ownerObject.Name)) + command = append(command, "restore") + } + + args = append(args, podInfo.logFormatArgs...) + args = append(args, podInfo.logLevelArgs...) + + var securityCtx *corev1api.PodSecurityContext + nodeSelector := map[string]string{} + podOS := corev1api.PodOS{} + toleration := []corev1api.Toleration{} + if nodeOS == kube.NodeOSWindows { + userID := "ContainerAdministrator" + securityCtx = &corev1api.PodSecurityContext{ + WindowsOptions: &corev1api.WindowsSecurityContextOptions{ + RunAsUserName: &userID, + }, + } + + nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows + podOS.Name = kube.NodeOSWindows + + toleration = append(toleration, corev1api.Toleration{ + Key: "os", + Operator: "Equal", + Effect: "NoSchedule", + Value: "windows", + }) + } else { + userID := int64(0) + securityCtx = &corev1api.PodSecurityContext{ + RunAsUser: &userID, + } + + nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux + podOS.Name = kube.NodeOSLinux + } + + pod := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: hostingPodName, + Namespace: ownerObject.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: ownerObject.APIVersion, + Kind: ownerObject.Kind, + Name: ownerObject.Name, + UID: ownerObject.UID, + Controller: boolptr.True(), + }, + }, + Labels: label, + Annotations: annotation, + }, + Spec: corev1api.PodSpec{ + NodeSelector: nodeSelector, + OS: &podOS, + Containers: []corev1api.Container{ + { + Name: containerName, + Image: podInfo.image, + ImagePullPolicy: corev1api.PullNever, + Command: command, + Args: args, + VolumeMounts: volumeMounts, + Env: podInfo.env, + EnvFrom: podInfo.envFrom, + Resources: resources, + }, + }, + ServiceAccountName: podInfo.serviceAccount, + TerminationGracePeriodSeconds: &gracePeriod, + Volumes: volumes, + NodeName: selectedNode, + RestartPolicy: corev1api.RestartPolicyNever, + SecurityContext: securityCtx, + Tolerations: toleration, + }, + } + + return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) +} diff --git a/pkg/exposer/pod_volume_test.go b/pkg/exposer/pod_volume_test.go new file mode 100644 index 000000000..aac24990f --- /dev/null +++ b/pkg/exposer/pod_volume_test.go @@ -0,0 +1,591 @@ +package exposer + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + appsv1api "k8s.io/api/apps/v1" + corev1api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" +) + +func TestPodVolumeExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + podWithNoNode := builder.ForPod("fake-ns", "fake-client-pod").Result() + podWithNode := builder.ForPod("fake-ns", "fake-client-pod").NodeName("fake-node").Result() + + node := builder.ForNode("fake-node").Result() + + daemonSet := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: appsv1api.SchemeGroupVersion.String(), + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Containers: []corev1api.Container{ + { + Name: "node-agent", + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + snapshotClientObj []runtime.Object + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + exposeParam PodVolumeExposeParam + funcGetPodVolumeHostPath func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) + funcExtractPodVolumeHostPath func(context.Context, string, kubernetes.Interface, string, string) (string, error) + err string + }{ + { + name: "get client pod fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + }, + err: "error getting client pod fake-client-pod: pods \"fake-client-pod\" not found", + }, + { + name: "client pod with no node name", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + }, + kubeClientObj: []runtime.Object{ + podWithNoNode, + }, + err: "client pod fake-client-pod doesn't have a node name", + }, + { + name: "get node os fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + }, + err: "error getting OS for node fake-node: error getting node fake-node: nodes \"fake-node\" not found", + }, + { + name: "get pod volume path fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{}, errors.New("fake-get-pod-volume-path-error") + }, + err: "error to get pod volume path: fake-get-pod-volume-path-error", + }, + { + name: "extract pod volume path fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "", errors.New("fake-extract-error") + }, + err: "error to extract pod volume path: fake-extract-error", + }, + { + name: "create hosting pod fail", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + err: "error to create hosting pod: error to get inherited pod info from node-agent: error to get node-agent pod template: error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found", + }, + { + name: "succeed", + ownerBackup: backup, + exposeParam: PodVolumeExposeParam{ + ClientNamespace: "fake-ns", + ClientPodName: "fake-client-pod", + ClientPodVolume: "fake-client-volume", + }, + kubeClientObj: []runtime.Object{ + podWithNode, + node, + daemonSet, + }, + funcGetPodVolumeHostPath: func(context.Context, *corev1api.Pod, string, kubernetes.Interface, filesystem.Interface, logrus.FieldLogger) (datapath.AccessPoint, error) { + return datapath.AccessPoint{ + ByPath: "/host_pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", + }, nil + }, + funcExtractPodVolumeHostPath: func(context.Context, string, kubernetes.Interface, string, string) (string, error) { + return "/var/lib/kubelet/pods/pod-id-xxx/volumes/kubernetes.io~csi/pvc-id-xxx/mount", nil + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + exposer := podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + if test.funcGetPodVolumeHostPath != nil { + getPodVolumeHostPath = test.funcGetPodVolumeHostPath + } + + if test.funcExtractPodVolumeHostPath != nil { + extractPodVolumeHostPath = test.funcExtractPodVolumeHostPath + } + + err := exposer.Expose(context.Background(), ownerObject, test.exposeParam) + if err == nil { + assert.NoError(t, err) + + _, err = exposer.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(context.Background(), ownerObject.Name, metav1.GetOptions{}) + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestGetPodVolumeExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodNotRunning := builder.ForPod(backup.Namespace, backup.Name).Result() + backupPodRunning := builder.ForPod(backup.Namespace, backup.Name).Phase(corev1api.PodRunning).Result() + + scheme := runtime.NewScheme() + corev1api.AddToScheme(scheme) + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + nodeName string + Timeout time.Duration + err string + expectedResult *ExposeResult + }{ + { + name: "backup pod is not found", + ownerBackup: backup, + nodeName: "fake-node", + }, + { + name: "wait backup pod running fail", + ownerBackup: backup, + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + backupPodNotRunning, + }, + Timeout: time.Second, + err: "error to wait for rediness of pod fake-backup: context deadline exceeded", + }, + { + name: "succeed", + ownerBackup: backup, + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + backupPodRunning, + }, + Timeout: time.Second, + expectedResult: &ExposeResult{ + ByPod: ExposeByPod{ + HostingPod: backupPodRunning, + VolumeName: string(backup.UID), + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + exposer := podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + result, err := exposer.GetExposed(context.Background(), ownerObject, fakeClient, test.nodeName, test.Timeout) + if test.err == "" { + assert.NoError(t, err) + + if test.expectedResult == nil { + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedResult.ByPod.VolumeName, result.ByPod.VolumeName) + assert.Equal(t, test.expectedResult.ByPod.HostingPod.Name, result.ByPod.HostingPod.Name) + } + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestPodVolumePeekExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodUrecoverable := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: backup.Name, + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + } + + backupPod := &corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backup.Namespace, + Name: backup.Name, + }, + } + + scheme := runtime.NewScheme() + corev1api.AddToScheme(scheme) + + tests := []struct { + name string + kubeClientObj []runtime.Object + ownerBackup *velerov1.Backup + err string + }{ + { + name: "backup pod is not found", + ownerBackup: backup, + }, + { + name: "pod is unrecoverable", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + backupPodUrecoverable, + }, + err: "Pod is in abnormal state [Failed], message []", + }, + { + name: "succeed", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + backupPod, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + exposer := podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + + var ownerObject corev1api.ObjectReference + if test.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: test.ownerBackup.Kind, + Namespace: test.ownerBackup.Namespace, + Name: test.ownerBackup.Name, + UID: test.ownerBackup.UID, + APIVersion: test.ownerBackup.APIVersion, + } + } + + err := exposer.PeekExposed(context.Background(), ownerObject) + if test.err == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestPodVolumeDiagnoseExpose(t *testing.T) { + backup := &velerov1.Backup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov1.SchemeGroupVersion.String(), + Kind: "Backup", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + UID: "fake-uid", + }, + } + + backupPodWithoutNodeName := corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backup.APIVersion, + Kind: backup.Kind, + Name: backup.Name, + UID: backup.UID, + }, + }, + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodPending, + Conditions: []corev1api.PodCondition{ + { + Type: corev1api.PodInitialized, + Status: corev1api.ConditionTrue, + Message: "fake-pod-message", + }, + }, + }, + } + + backupPodWithNodeName := corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "fake-backup", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backup.APIVersion, + Kind: backup.Kind, + Name: backup.Name, + UID: backup.UID, + }, + }, + }, + Spec: corev1api.PodSpec{ + NodeName: "fake-node", + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodPending, + Conditions: []corev1api.PodCondition{ + { + Type: corev1api.PodInitialized, + Status: corev1api.ConditionTrue, + Message: "fake-pod-message", + }, + }, + }, + } + + nodeAgentPod := corev1api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1.DefaultNamespace, + Name: "node-agent-pod-1", + Labels: map[string]string{"role": "node-agent"}, + }, + Spec: corev1api.PodSpec{ + NodeName: "fake-node", + }, + Status: corev1api.PodStatus{ + Phase: corev1api.PodRunning, + }, + } + + tests := []struct { + name string + ownerBackup *velerov1.Backup + kubeClientObj []runtime.Object + snapshotClientObj []runtime.Object + expected string + }{ + { + name: "no pod", + ownerBackup: backup, + expected: `begin diagnose pod volume exposer +error getting hosting pod fake-backup, err: pods "fake-backup" not found +end diagnose pod volume exposer`, + }, + { + name: "pod without node name, pvc without volume name, vs without status", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithoutNodeName, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name +Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + { + name: "pod without node name", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithoutNodeName, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name +Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + { + name: "pod with node name, no node agent", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithNodeName, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +node-agent is not running in node fake-node, err: daemonset pod not found in running state in node fake-node +end diagnose pod volume exposer`, + }, + { + name: "pod with node name, node agent is running", + ownerBackup: backup, + kubeClientObj: []runtime.Object{ + &backupPodWithNodeName, + &nodeAgentPod, + }, + expected: `begin diagnose pod volume exposer +Pod velero/fake-backup, phase Pending, node name fake-node +Pod condition Initialized, status True, reason , message fake-pod-message +end diagnose pod volume exposer`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(tt.kubeClientObj...) + e := &podVolumeExposer{ + kubeClient: fakeKubeClient, + log: velerotest.NewLogger(), + } + var ownerObject corev1api.ObjectReference + if tt.ownerBackup != nil { + ownerObject = corev1api.ObjectReference{ + Kind: tt.ownerBackup.Kind, + Namespace: tt.ownerBackup.Namespace, + Name: tt.ownerBackup.Name, + UID: tt.ownerBackup.UID, + APIVersion: tt.ownerBackup.APIVersion, + } + } + + diag := e.DiagnoseExpose(context.Background(), ownerObject) + assert.Equal(t, tt.expected, diag) + }) + } +} diff --git a/pkg/install/daemonset.go b/pkg/install/daemonset.go index 1dee41a33..7eaa2a094 100644 --- a/pkg/install/daemonset.go +++ b/pkg/install/daemonset.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/vmware-tanzu/velero/internal/velero" + "github.com/vmware-tanzu/velero/pkg/nodeagent" ) func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet { @@ -126,8 +127,8 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1api.DaemonSet }, VolumeMounts: []corev1api.VolumeMount{ { - Name: "host-pods", - MountPath: "/host_pods", + Name: nodeagent.HostPodVolumeMount, + MountPath: nodeagent.HostPodVolumeMountPath(), MountPropagation: &mountPropagationMode, }, { diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 34b782cbc..3d1159085 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -41,6 +41,12 @@ const ( // nodeAgentRole marks pods with node-agent role on all nodes. nodeAgentRole = "node-agent" + + // HostPodVolumeMount is the name of the volume in node-agent for host-pod mount + HostPodVolumeMount = "host-pods" + + // HostPodVolumeMountPoint is the mount point of the volume in node-agent for host-pod mount + HostPodVolumeMountPoint = "host_pods" ) var ( @@ -249,3 +255,45 @@ func GetAnnotationValue(ctx context.Context, kubeClient kubernetes.Interface, na return val, nil } + +func GetHostPodPath(ctx context.Context, kubeClient kubernetes.Interface, namespace string, osType string) (string, error) { + dsName := daemonSet + if osType == kube.NodeOSWindows { + dsName = daemonsetWindows + } + + ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{}) + if err != nil { + return "", errors.Wrapf(err, "error getting daemonset %s", dsName) + } + + var volume *corev1api.Volume + for _, v := range ds.Spec.Template.Spec.Volumes { + if v.Name == HostPodVolumeMount { + volume = &v + break + } + } + + if volume == nil { + return "", errors.New("host pod volume is not found") + } + + if volume.HostPath == nil { + return "", errors.New("host pod volume is not a host path volume") + } + + if volume.HostPath.Path == "" { + return "", errors.New("host pod volume path is empty") + } + + return volume.HostPath.Path, nil +} + +func HostPodVolumeMountPath() string { + return "/" + HostPodVolumeMountPoint +} + +func HostPodVolumeMountPathWin() string { + return "\\" + HostPodVolumeMountPoint +} diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go index 9a78ca033..8f94b1ff6 100644 --- a/pkg/nodeagent/node_agent_test.go +++ b/pkg/nodeagent/node_agent_test.go @@ -590,3 +590,164 @@ func TestGetAnnotationValue(t *testing.T) { }) } } + +func TestGetHostPodPath(t *testing.T) { + daemonSet := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + } + + daemonSetWithHostPodVolume := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Volumes: []corev1api.Volume{ + { + Name: HostPodVolumeMount, + }, + }, + }, + }, + }, + } + + daemonSetWithHostPodVolumeAndEmptyPath := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Volumes: []corev1api.Volume{ + { + Name: HostPodVolumeMount, + VolumeSource: corev1api.VolumeSource{ + HostPath: &corev1api.HostPathVolumeSource{}, + }, + }, + }, + }, + }, + }, + } + + daemonSetWithHostPodVolumeAndValidPath := &appsv1api.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1api.DaemonSetSpec{ + Template: corev1api.PodTemplateSpec{ + Spec: corev1api.PodSpec{ + Volumes: []corev1api.Volume{ + { + Name: HostPodVolumeMount, + VolumeSource: corev1api.VolumeSource{ + HostPath: &corev1api.HostPathVolumeSource{ + Path: "/var/lib/kubelet/pods", + }, + }, + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + osType string + expectedValue string + expectErr string + }{ + { + name: "ds get error", + namespace: "fake-ns", + osType: kube.NodeOSWindows, + kubeClientObj: []runtime.Object{ + daemonSet, + }, + expectErr: "error getting daemonset node-agent-windows: daemonsets.apps \"node-agent-windows\" not found", + }, + { + name: "no host pod volume", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSet, + }, + expectErr: "host pod volume is not found", + }, + { + name: "no host pod volume path", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolume, + }, + expectErr: "host pod volume is not a host path volume", + }, + { + name: "empty host pod volume path", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolumeAndEmptyPath, + }, + expectErr: "host pod volume path is empty", + }, + { + name: "succeed", + namespace: "fake-ns", + osType: kube.NodeOSLinux, + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolumeAndValidPath, + }, + expectedValue: "/var/lib/kubelet/pods", + }, + { + name: "succeed on empty os type", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + daemonSetWithHostPodVolumeAndValidPath, + }, + expectedValue: "/var/lib/kubelet/pods", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + path, err := GetHostPodPath(context.TODO(), fakeKubeClient, test.namespace, test.osType) + + if test.expectErr == "" { + assert.NoError(t, err) + assert.Equal(t, test.expectedValue, path) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index 5d64f117b..eab3193fc 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1api "k8s.io/api/core/v1" - storagev1api "k8s.io/api/storage/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -33,8 +32,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/label" @@ -148,8 +147,8 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core // GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods//volumes/, // where the specified volume lives. // For volumes with a CSIVolumeSource, append "/mount" to the directory name. -func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) { - pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli) +func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) (string, error) { + pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient) if err != nil { // This case implies the administrator created the PV and attached it directly, without PVC. // Note that only one VolumeSource can be populated per Volume on a pod @@ -164,7 +163,7 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1 // Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source. // PV's been created with a CSI source. - isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli) + isProvisionedByCSI, err := isProvisionedByCSI(log, pv, kubeClient) if err != nil { return "", errors.WithStack(err) } @@ -179,9 +178,9 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1 } // GetVolumeMode gets the uploader.PersistentVolumeMode of the volume. -func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) ( +func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) ( uploader.PersistentVolumeMode, error) { - _, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli) + _, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, kubeClient) if err != nil { if err == ErrorPodVolumeIsNotPVC { @@ -198,7 +197,7 @@ func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.P // GetPodPVCVolume gets the PVC, PV and volume for a pod volume name. // Returns pod volume in case of ErrorPodVolumeIsNotPVC error -func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) ( +func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, kubeClient kubernetes.Interface) ( *corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) { var volume *corev1api.Volume @@ -217,14 +216,12 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC } - pvc := &corev1api.PersistentVolumeClaim{} - err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc) + pvc, err := kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, volume.VolumeSource.PersistentVolumeClaim.ClaimName, metav1.GetOptions{}) if err != nil { return nil, nil, nil, errors.WithStack(err) } - pv := &corev1api.PersistentVolume{} - err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv) + pv, err := kubeClient.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{}) if err != nil { return nil, nil, nil, errors.WithStack(err) } @@ -235,7 +232,7 @@ func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api // isProvisionedByCSI function checks whether this is a CSI PV by annotation. // Either "pv.kubernetes.io/provisioned-by" or "pv.kubernetes.io/migrated-to" indicates // PV is provisioned by CSI. -func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kbClient client.Client) (bool, error) { +func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kubeClient kubernetes.Interface) (bool, error) { if pv.Spec.CSI != nil { return true, nil } @@ -245,10 +242,11 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, driverName := pv.Annotations[KubeAnnDynamicallyProvisioned] migratedDriver := pv.Annotations[KubeAnnMigratedTo] if len(driverName) > 0 || len(migratedDriver) > 0 { - list := &storagev1api.CSIDriverList{} - if err := kbClient.List(context.TODO(), list); err != nil { + list, err := kubeClient.StorageV1().CSIDrivers().List(context.TODO(), metav1.ListOptions{}) + if err != nil { return false, err } + for _, driver := range list.Items { if driverName == driver.Name || migratedDriver == driver.Name { log.Debugf("the annotation %s or %s equals to %s indicates the volume is provisioned by a CSI driver", KubeAnnDynamicallyProvisioned, KubeAnnMigratedTo, driver.Name) diff --git a/pkg/util/kube/utils_test.go b/pkg/util/kube/utils_test.go index b36ab8ef2..37f990778 100644 --- a/pkg/util/kube/utils_test.go +++ b/pkg/util/kube/utils_test.go @@ -34,11 +34,12 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/vmware-tanzu/velero/pkg/builder" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" + + "k8s.io/client-go/kubernetes/fake" ) func TestNamespaceAndName(t *testing.T) { @@ -216,17 +217,18 @@ func TestGetVolumeDirectorySuccess(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "csi.test.com"}, } for _, tc := range tests { - clientBuilder := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}}) - + objs := []runtime.Object{&csiDriver} if tc.pvc != nil { - clientBuilder = clientBuilder.WithObjects(tc.pvc) + objs = append(objs, tc.pvc) } if tc.pv != nil { - clientBuilder = clientBuilder.WithObjects(tc.pv) + objs = append(objs, tc.pv) } + fakeKubeClient := fake.NewSimpleClientset(objs...) + // Function under test - dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build()) + dir, err := GetVolumeDirectory(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient) require.NoError(t, err) assert.Equal(t, tc.want, dir) @@ -264,17 +266,18 @@ func TestGetVolumeModeSuccess(t *testing.T) { } for _, tc := range tests { - clientBuilder := fake.NewClientBuilder() - + objs := []runtime.Object{} if tc.pvc != nil { - clientBuilder = clientBuilder.WithObjects(tc.pvc) + objs = append(objs, tc.pvc) } if tc.pv != nil { - clientBuilder = clientBuilder.WithObjects(tc.pv) + objs = append(objs, tc.pv) } + fakeKubeClient := fake.NewSimpleClientset(objs...) + // Function under test - mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build()) + mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, fakeKubeClient) require.NoError(t, err) assert.Equal(t, tc.want, mode)