diff --git a/changelogs/unreleased/7059-Lyndon-Li b/changelogs/unreleased/7059-Lyndon-Li new file mode 100644 index 000000000..77b3a1765 --- /dev/null +++ b/changelogs/unreleased/7059-Lyndon-Li @@ -0,0 +1 @@ +Add the implementation for design #6950, configurable data path concurrency \ No newline at end of file diff --git a/changelogs/unreleased/7061-blackpiglet b/changelogs/unreleased/7061-blackpiglet new file mode 100644 index 000000000..ac965ed13 --- /dev/null +++ b/changelogs/unreleased/7061-blackpiglet @@ -0,0 +1 @@ +Add DataUpload Result and CSI VolumeSnapshot check for restore PV. \ No newline at end of file diff --git a/changelogs/unreleased/7072-ywk253100 b/changelogs/unreleased/7072-ywk253100 new file mode 100644 index 000000000..2a6faffe3 --- /dev/null +++ b/changelogs/unreleased/7072-ywk253100 @@ -0,0 +1 @@ +Truncate the credential file to avoid the change of secret content messing it up \ No newline at end of file diff --git a/design/node-agent-concurrency.md b/design/node-agent-concurrency.md index c3d84e69e..71dd33e21 100644 --- a/design/node-agent-concurrency.md +++ b/design/node-agent-concurrency.md @@ -28,10 +28,15 @@ Therefore, in order to gain the optimized performance with the limited resources We introduce a configMap named ```node-agent-configs``` for users to specify the node-agent related configurations. This configMap is not created by Velero, users should create it manually on demand. The configMap should be in the same namespace where Velero is installed. If multiple Velero instances are installed in different namespaces, there should be one configMap in each namespace which applies to node-agent in that namespace only. Node-agent server checks these configurations at startup time and use it to initiate the related VGDP modules. Therefore, users could edit this configMap any time, but in order to make the changes effective, node-agent server needs to be restarted. -The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```data-path-concurrency```. +The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```dataPathConcurrency```. -The data structure for ```data-path-concurrency``` is as below: +The data structure for ```node-agent-configs``` is as below: ```go +type Configs struct { + // DataPathConcurrency is the config for data path concurrency per node. + DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"` +} + type DataPathConcurrency struct { // GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified GlobalConfig int `json:"globalConfig,omitempty"` @@ -50,7 +55,7 @@ type RuledConfigs struct { ``` ### Global concurrent number -We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```data-path-concurrency```. +We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```dataPathConcurrency```. The number starts from 1 which means there is no concurrency, only one instance of VGDP is allowed. There is no roof limit. If this number is not specified or not valid, a hard-coded default value will be used, the value is set to 1. @@ -62,7 +67,7 @@ We allow users to specify different concurrent number per node, for example, use The range of Per-node concurrent number is the same with Global concurrent number. Per-node concurrent number is preferable to Global concurrent number, so it will overwrite the Global concurrent number for that node. -Per-node concurrent number is implemented through ```perNodeConfig``` field in ```data-path-concurrency```. +Per-node concurrent number is implemented through ```perNodeConfig``` field in ```dataPathConcurrency```. ```perNodeConfig``` is a list of ```RuledConfigs``` each item of which matches one or more nodes by label selectors and specify the concurrent number for the matched nodes. This means, the nodes are identified by labels. @@ -80,30 +85,32 @@ If one node falls into more than one rules, e.g., if node1 also has the label `` A sample of the ```node-agent-configs``` configMap is as below: ```json { - "globalConfig": 2, - "perNodeConfig": [ - { - "nodeSelector": { - "matchLabels": { - "kubernetes.io/hostname": "node1" - } + "dataPathConcurrency": { + "globalConfig": 2, + "perNodeConfig": [ + { + "nodeSelector": { + "matchLabels": { + "kubernetes.io/hostname": "node1" + } + }, + "number": 3 }, - "number": 3 - }, - { - "nodeSelector": { - "matchLabels": { - "beta.kubernetes.io/instance-type": "Standard_B4ms" - } - }, - "number": 5 - } - ] + { + "nodeSelector": { + "matchLabels": { + "beta.kubernetes.io/instance-type": "Standard_B4ms" + } + }, + "number": 5 + } + ] + } } ``` To create the configMap, users need to save something like the above sample to a json file and then run below command: ``` -kubectl create cm node-agent-configs -n velero --from-file=data-path-concurrency= +kubectl create cm node-agent-configs -n velero --from-file= ``` ### Global data path manager diff --git a/internal/credentials/file_store.go b/internal/credentials/file_store.go index 1332d4f8d..4b5d25664 100644 --- a/internal/credentials/file_store.go +++ b/internal/credentials/file_store.go @@ -71,7 +71,7 @@ func (n *namespacedFileStore) Path(selector *corev1api.SecretKeySelector) (strin keyFilePath := filepath.Join(n.fsRoot, fmt.Sprintf("%s-%s", selector.Name, selector.Key)) - file, err := n.fs.OpenFile(keyFilePath, os.O_RDWR|os.O_CREATE, 0644) + file, err := n.fs.OpenFile(keyFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return "", errors.Wrap(err, "unable to open credentials file for writing") } diff --git a/pkg/builder/node_builder.go b/pkg/builder/node_builder.go index 52e2c1e2e..d3b6f51ec 100644 --- a/pkg/builder/node_builder.go +++ b/pkg/builder/node_builder.go @@ -41,6 +41,11 @@ func ForNode(name string) *NodeBuilder { } } +func (b *NodeBuilder) Labels(labels map[string]string) *NodeBuilder { + b.object.Labels = labels + return b +} + // Result returns the built Node. func (b *NodeBuilder) Result() *corev1api.Node { return b.object diff --git a/pkg/builder/pod_builder.go b/pkg/builder/pod_builder.go index 8931c14b9..886d7a411 100644 --- a/pkg/builder/pod_builder.go +++ b/pkg/builder/pod_builder.go @@ -101,3 +101,8 @@ func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.Container } return b } + +func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder { + b.object.Status.Phase = phase + return b +} diff --git a/pkg/builder/volume_snapshot_builder.go b/pkg/builder/volume_snapshot_builder.go index 19815c0f0..bbaedd16e 100644 --- a/pkg/builder/volume_snapshot_builder.go +++ b/pkg/builder/volume_snapshot_builder.go @@ -67,3 +67,8 @@ func (v *VolumeSnapshotBuilder) BoundVolumeSnapshotContentName(vscName string) * v.object.Status.BoundVolumeSnapshotContentName = &vscName return v } + +func (v *VolumeSnapshotBuilder) SourcePVC(name string) *VolumeSnapshotBuilder { + v.object.Spec.Source.PersistentVolumeClaimName = &name + return v +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 105e37052..835b899c3 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -19,6 +19,7 @@ package nodeagent import ( "context" "fmt" + "math" "net/http" "os" "strings" @@ -32,6 +33,7 @@ import ( storagev1api "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -53,7 +55,9 @@ import ( "github.com/vmware-tanzu/velero/pkg/cmd" "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" "github.com/vmware-tanzu/velero/pkg/controller" + "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" @@ -73,6 +77,7 @@ const ( defaultResourceTimeout = 10 * time.Minute defaultDataMoverPrepareTimeout = 30 * time.Minute + defaultDataPathConcurrentNum = 1 ) type nodeAgentServerConfig struct { @@ -132,6 +137,7 @@ type nodeAgentServer struct { config nodeAgentServerConfig kubeClient kubernetes.Interface csiSnapshotClient *snapshotv1client.Clientset + dataPathMgr *datapath.Manager } func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { @@ -219,6 +225,10 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi if err != nil { return nil, err } + + dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum) + s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum) + return s, nil } @@ -263,24 +273,24 @@ func (s *nodeAgentServer) run() { credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), repoEnsurer, + pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger) if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup) } - if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { + if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.markDataUploadsCancel(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.markDataDownloadsCancel(dataDownloadReconciler) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") @@ -478,3 +488,65 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { s.logger.WithField("podvolumerestore", pvr.GetName()).Warn(pvr.Status.Message) } } + +var getConfigsFunc = nodeagent.GetConfigs + +func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int { + configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient) + if err != nil { + s.logger.WithError(err).Warn("Failed to get node agent configs") + return defaultNum + } + + if configs == nil || configs.DataPathConcurrency == nil { + s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum) + return defaultNum + } + + globalNum := configs.DataPathConcurrency.GlobalConfig + + if globalNum <= 0 { + s.logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum) + globalNum = defaultNum + } + + if len(configs.DataPathConcurrency.PerNodeConfig) == 0 { + return globalNum + } + + curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{}) + if err != nil { + s.logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum) + return globalNum + } + + concurrentNum := math.MaxInt32 + + for _, rule := range configs.DataPathConcurrency.PerNodeConfig { + selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector) + if err != nil { + s.logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String()) + continue + } + + if rule.Number <= 0 { + s.logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number) + continue + } + + if selector.Matches(labels.Set(curNode.GetLabels())) { + if concurrentNum > rule.Number { + concurrentNum = rule.Number + } + } + } + + if concurrentNum == math.MaxInt32 { + s.logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum) + concurrentNum = globalNum + } else { + s.logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName) + } + + return concurrentNum +} diff --git a/pkg/cmd/cli/nodeagent/server_test.go b/pkg/cmd/cli/nodeagent/server_test.go index d66fc08eb..4472dfce1 100644 --- a/pkg/cmd/cli/nodeagent/server_test.go +++ b/pkg/cmd/cli/nodeagent/server_test.go @@ -17,16 +17,22 @@ package nodeagent import ( "context" + "fmt" "os" "path/filepath" + "strings" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/nodeagent" testutil "github.com/vmware-tanzu/velero/pkg/test" ) @@ -107,3 +113,259 @@ func Test_validatePodVolumesHostPath(t *testing.T) { }) } } + +func Test_getDataPathConcurrentNum(t *testing.T) { + defaultNum := 100001 + globalNum := 6 + nodeName := "node-agent-node" + node1 := builder.ForNode("node-agent-node").Result() + node2 := builder.ForNode("node-agent-node").Labels(map[string]string{ + "host-name": "node-1", + "xxxx": "yyyyy", + }).Result() + + invalidLabelSelector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "inva/lid": "inva/lid", + }, + } + validLabelSelector1 := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "host-name": "node-1", + }, + } + validLabelSelector2 := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "xxxx": "yyyyy", + }, + } + + tests := []struct { + name string + getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) + setKubeClient bool + kubeClientObj []runtime.Object + expectNum int + expectLog string + }{ + { + name: "failed to get configs", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, errors.New("fake-get-error") + }, + expectLog: "Failed to get node agent configs", + expectNum: defaultNum, + }, + { + name: "configs cm not found", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return nil, nil + }, + expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), + expectNum: defaultNum, + }, + { + name: "configs cm's data path concurrency is nil", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{}, nil + }, + expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum), + expectNum: defaultNum, + }, + { + name: "global number is invalid", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: -1, + }, + }, nil + }, + expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum), + expectNum: defaultNum, + }, + { + name: "global number is valid", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + }, + }, nil + }, + expectNum: globalNum, + }, + { + name: "node is not found", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + Number: 100, + }, + }, + }, + }, nil + }, + setKubeClient: true, + expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum), + expectNum: globalNum, + }, + { + name: "failed to get selector", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: invalidLabelSelector, + Number: 100, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node1}, + expectLog: fmt.Sprintf("Failed to parse rule with label selector %s, skip it", invalidLabelSelector.String()), + expectNum: globalNum, + }, + { + name: "rule number is invalid", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node1}, + expectLog: fmt.Sprintf("Rule with label selector %s is with an invalid number %v, skip it", validLabelSelector1.String(), -1), + expectNum: globalNum, + }, + { + name: "label doesn't match", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: -1, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node1}, + expectLog: fmt.Sprintf("Per node number for node %s is not found, use the global number %v", nodeName, globalNum), + expectNum: globalNum, + }, + { + name: "match one rule", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node2}, + expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 66, globalNum, nodeName), + expectNum: 66, + }, + { + name: "match multiple rules", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 66, + }, + { + NodeSelector: validLabelSelector2, + Number: 36, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node2}, + expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName), + expectNum: 36, + }, + { + name: "match multiple rules 2", + getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) { + return &nodeagent.Configs{ + DataPathConcurrency: &nodeagent.DataPathConcurrency{ + GlobalConfig: globalNum, + PerNodeConfig: []nodeagent.RuledConfigs{ + { + NodeSelector: validLabelSelector1, + Number: 36, + }, + { + NodeSelector: validLabelSelector2, + Number: 66, + }, + }, + }, + }, nil + }, + setKubeClient: true, + kubeClientObj: []runtime.Object{node2}, + expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName), + expectNum: 36, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + logBuffer := "" + + s := &nodeAgentServer{ + nodeName: nodeName, + logger: testutil.NewSingleLogger(&logBuffer), + } + + if test.setKubeClient { + s.kubeClient = fakeKubeClient + } + + getConfigsFunc = test.getFunc + + num := s.getDataPathConcurrentNum(defaultNum) + assert.Equal(t, test.expectNum, num) + if test.expectLog == "" { + assert.Equal(t, "", logBuffer) + } else { + assert.True(t, strings.Contains(logBuffer, test.expectLog)) + } + }) + } +} diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 0212fa00f..bf4299ea4 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -69,7 +69,7 @@ type DataDownloadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, +func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, @@ -81,7 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter nodeName: nodeName, repositoryEnsurer: repoEnsurer, restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, preparingTimeout: preparingTimeout, metrics: metrics, } diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 46162f0cb..de9fa7516 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -142,7 +142,10 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... if err != nil { return nil, err } - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + + dataPathMgr := datapath.NewManager(1) + + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 11c4a0f23..9465528e3 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -79,8 +79,8 @@ type DataUploadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, - csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, +func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, + dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, @@ -93,7 +93,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa logger: log, repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, preparingTimeout: preparingTimeout, metrics: metrics, } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 34bc4a6aa..b61cd07b3 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -162,6 +162,8 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci Spec: appsv1.DaemonSetSpec{}, } + dataPathMgr := datapath.NewManager(1) + now, err := time.Parse(time.RFC1123, time.RFC1123) if err != nil { return nil, err @@ -217,7 +219,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci if err != nil { return nil, err } - return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil, + return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index f074e0cc2..6a66f98e1 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -47,7 +47,7 @@ import ( const pVBRRequestor string = "pod-volume-backup-restore" // NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance -func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, +func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { return &PodVolumeBackupReconciler{ Client: client, @@ -59,7 +59,7 @@ func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensu clock: &clocks.RealClock{}, scheme: scheme, metrics: metrics, - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, } } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index d48925710..34d6c6551 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -50,7 +50,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) -func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ensurer, +func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ Client: client, @@ -59,7 +59,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ens credentialGetter: credentialGetter, fileSystem: filesystem.NewFileSystem(), clock: &clocks.RealClock{}, - dataPathMgr: datapath.NewManager(1), + dataPathMgr: dataPathMgr, } } diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 5d6ed505e..f6b9b39d9 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -515,6 +515,11 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu return errors.Wrap(err, "error fetching volume snapshots metadata") } + csiVolumeSnapshots, err := backupStore.GetCSIVolumeSnapshots(restore.Spec.BackupName) + if err != nil { + return errors.Wrap(err, "fail to fetch CSI VolumeSnapshots metadata") + } + restoreLog.Info("starting restore") var podVolumeBackups []*api.PodVolumeBackup @@ -531,6 +536,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu BackupReader: backupFile, ResourceModifiers: resourceModifiers, DisableInformerCache: r.disableInformerCache, + CSIVolumeSnapshots: csiVolumeSnapshots, } restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager) diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index b7f6fbd25..9437f1d1c 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -471,6 +472,7 @@ func TestRestoreReconcile(t *testing.T) { } if test.expectedRestorerCall != nil { backupStore.On("GetBackupContents", test.backup.Name).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil) + backupStore.On("GetCSIVolumeSnapshots", test.backup.Name).Return([]*snapshotv1api.VolumeSnapshot{}, nil) restorer.On("RestoreWithResolvers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors) diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 629ae04f7..ff93ed596 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -18,6 +18,7 @@ package nodeagent import ( "context" + "encoding/json" "fmt" "github.com/pkg/errors" @@ -33,13 +34,36 @@ import ( const ( // daemonSet is the name of the Velero node agent daemonset. - daemonSet = "node-agent" + daemonSet = "node-agent" + configName = "node-agent-configs" + dataPathConConfigName = "data-path-concurrency" ) var ( ErrDaemonSetNotFound = errors.New("daemonset not found") ) +type DataPathConcurrency struct { + // GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified + GlobalConfig int `json:"globalConfig,omitempty"` + + // PerNodeConfig specifies the concurrency number to nodes matched by rules + PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"` +} + +type RuledConfigs struct { + // NodeSelector specifies the label selector to match nodes + NodeSelector metav1.LabelSelector `json:"nodeSelector"` + + // Number specifies the number value associated to the matched nodes + Number int `json:"number"` +} + +type Configs struct { + // DataPathConcurrency is the config for data path concurrency per node. + DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"` +} + // IsRunning checks if the node agent daemonset is running properly. If not, return the error found func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error { if _, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) { @@ -89,3 +113,31 @@ func GetPodSpec(ctx context.Context, kubeClient kubernetes.Interface, namespace return &ds.Spec.Template.Spec, nil } + +func GetConfigs(ctx context.Context, namespace string, kubeClient kubernetes.Interface) (*Configs, error) { + cm, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(ctx, configName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } else { + return nil, errors.Wrapf(err, "error to get node agent configs %s", configName) + } + } + + if cm.Data == nil { + return nil, errors.Errorf("data is not available in config map %s", configName) + } + + jsonString := "" + for _, v := range cm.Data { + jsonString = v + } + + configs := &Configs{} + err = json.Unmarshal([]byte(jsonString), configs) + if err != nil { + return nil, errors.Wrapf(err, "error to unmarshall configs from %s", configName) + } + + return configs, nil +} diff --git a/pkg/nodeagent/node_agent_test.go b/pkg/nodeagent/node_agent_test.go new file mode 100644 index 000000000..a18e45b14 --- /dev/null +++ b/pkg/nodeagent/node_agent_test.go @@ -0,0 +1,337 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeagent + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + clientTesting "k8s.io/client-go/testing" + clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/vmware-tanzu/velero/pkg/builder" +) + +type reactor struct { + verb string + resource string + reactorFunc clientTesting.ReactionFunc +} + +func TestIsRunning(t *testing.T) { + daemonSet := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + kubeReactors []reactor + expectErr string + }{ + { + name: "ds is not found", + namespace: "fake-ns", + expectErr: "daemonset not found", + }, + { + name: "ds get error", + namespace: "fake-ns", + kubeReactors: []reactor{ + { + verb: "get", + resource: "daemonsets", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + expectErr: "fake-get-error", + }, + { + name: "succeed", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + daemonSet, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + err := IsRunning(context.TODO(), fakeKubeClient, test.namespace) + if test.expectErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} + +func TestIsRunningInNode(t *testing.T) { + scheme := runtime.NewScheme() + corev1.AddToScheme(scheme) + + nonNodeAgentPod := builder.ForPod("fake-ns", "fake-pod").Result() + nodeAgentPodNotRunning := builder.ForPod("fake-ns", "fake-pod").Labels(map[string]string{"name": "node-agent"}).Result() + nodeAgentPodRunning1 := builder.ForPod("fake-ns", "fake-pod-1").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result() + nodeAgentPodRunning2 := builder.ForPod("fake-ns", "fake-pod-2").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result() + nodeAgentPodRunning3 := builder.ForPod("fake-ns", "fake-pod-3"). + Labels(map[string]string{"name": "node-agent"}). + Phase(corev1.PodRunning). + NodeName("fake-node"). + Result() + + tests := []struct { + name string + kubeClientObj []runtime.Object + nodeName string + expectErr string + }{ + { + name: "node name is empty", + expectErr: "node name is empty", + }, + { + name: "ds pod not found", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nonNodeAgentPod, + }, + expectErr: "daemonset pod not found in running state in node fake-node", + }, + { + name: "ds po are not all running", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nodeAgentPodNotRunning, + nodeAgentPodRunning1, + }, + expectErr: "daemonset pod not found in running state in node fake-node", + }, + { + name: "ds pods wrong node name", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nodeAgentPodNotRunning, + nodeAgentPodRunning1, + nodeAgentPodRunning2, + }, + expectErr: "daemonset pod not found in running state in node fake-node", + }, + { + name: "succeed", + nodeName: "fake-node", + kubeClientObj: []runtime.Object{ + nodeAgentPodNotRunning, + nodeAgentPodRunning1, + nodeAgentPodRunning2, + nodeAgentPodRunning3, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + err := IsRunningInNode(context.TODO(), "", test.nodeName, fakeClient) + if test.expectErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} + +func TestGetPodSpec(t *testing.T) { + podSpec := corev1.PodSpec{ + NodeName: "fake-node", + } + + daemonSet := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "node-agent", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + }, + Spec: appsv1.DaemonSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: podSpec, + }, + }, + } + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + expectErr string + expectSpec corev1.PodSpec + }{ + { + name: "ds is not found", + namespace: "fake-ns", + expectErr: "error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found", + }, + { + name: "succeed", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + daemonSet, + }, + expectSpec: podSpec, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + spec, err := GetPodSpec(context.TODO(), fakeKubeClient, test.namespace) + if test.expectErr == "" { + assert.NoError(t, err) + assert.Equal(t, *spec, test.expectSpec) + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} + +func TestGetConfigs(t *testing.T) { + cm := builder.ForConfigMap("fake-ns", "node-agent-configs").Result() + cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "wrong").Result() + cmWithoutCocurrentData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"someothers\":{\"someother\": 10}}").Result() + cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"dataPathConcurrency\":{\"globalConfig\": 5}}").Result() + + tests := []struct { + name string + kubeClientObj []runtime.Object + namespace string + kubeReactors []reactor + expectResult *Configs + expectErr string + }{ + { + name: "cm is not found", + namespace: "fake-ns", + }, + { + name: "cm get error", + namespace: "fake-ns", + kubeReactors: []reactor{ + { + verb: "get", + resource: "configmaps", + reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.New("fake-get-error") + }, + }, + }, + expectErr: "error to get node agent configs node-agent-configs: fake-get-error", + }, + { + name: "cm's data is nil", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cm, + }, + expectErr: "data is not available in config map node-agent-configs", + }, + { + name: "cm's data is with invalid format", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithInvalidDataFormat, + }, + expectErr: "error to unmarshall configs from node-agent-configs: invalid character 'w' looking for beginning of value", + }, + { + name: "concurrency configs are not found", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithoutCocurrentData, + }, + expectResult: &Configs{nil}, + }, + { + name: "success", + namespace: "fake-ns", + kubeClientObj: []runtime.Object{ + cmWithValidData, + }, + expectResult: &Configs{ + DataPathConcurrency: &DataPathConcurrency{ + GlobalConfig: 5, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...) + + for _, reactor := range test.kubeReactors { + fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc) + } + + result, err := GetConfigs(context.TODO(), test.namespace, fakeKubeClient) + if test.expectErr == "" { + assert.NoError(t, err) + + if test.expectResult == nil { + assert.Nil(t, result) + } else if test.expectResult.DataPathConcurrency == nil { + assert.Nil(t, result.DataPathConcurrency) + } else { + assert.Equal(t, *test.expectResult.DataPathConcurrency, *result.DataPathConcurrency) + } + } else { + assert.EqualError(t, err, test.expectErr) + } + }) + } +} diff --git a/pkg/restore/request.go b/pkg/restore/request.go index dcc2ef3d6..2a267a5ff 100644 --- a/pkg/restore/request.go +++ b/pkg/restore/request.go @@ -21,6 +21,7 @@ import ( "io" "sort" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" @@ -60,6 +61,7 @@ type Request struct { itemOperationsList *[]*itemoperation.RestoreOperation ResourceModifiers *resourcemodifiers.ResourceModifiers DisableInformerCache bool + CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot } type restoredItemStatus struct { diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 83ac84ed7..4f25a63e8 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -30,6 +30,7 @@ import ( "time" "github.com/google/uuid" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -302,6 +303,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( pvsToProvision: sets.NewString(), pvRestorer: pvRestorer, volumeSnapshots: req.VolumeSnapshots, + csiVolumeSnapshots: req.CSIVolumeSnapshots, podVolumeBackups: req.PodVolumeBackups, resourceTerminatingTimeout: kr.resourceTerminatingTimeout, resourceTimeout: kr.resourceTimeout, @@ -352,6 +354,7 @@ type restoreContext struct { pvsToProvision sets.String pvRestorer PVRestorer volumeSnapshots []*volume.Snapshot + csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot podVolumeBackups []*velerov1api.PodVolumeBackup resourceTerminatingTimeout time.Duration resourceTimeout time.Duration @@ -1293,7 +1296,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } case hasPodVolumeBackup(obj, ctx): - ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a pod volume backup to be restored.") + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it has a pod volume backup to be restored.") ctx.pvsToProvision.Insert(name) // Return early because we don't want to restore the PV itself, we @@ -1301,15 +1308,45 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs, itemExists case hasCSIVolumeSnapshot(ctx, obj): - case hasSnapshotDataUpload(ctx, obj): + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it has a related CSI VolumeSnapshot.") + ctx.pvsToProvision.Insert(name) + if ready, err := ctx.featureVerifier.Verify(velerov1api.CSIFeatureFlag); !ready { ctx.log.Errorf("Failed to verify CSI modules, ready %v, err %v", ready, err) errs.Add(namespace, fmt.Errorf("CSI modules are not ready for restore. Check CSI feature is enabled and CSI plugin is installed")) - return warnings, errs, itemExists } + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. + return warnings, errs, itemExists + + case hasSnapshotDataUpload(ctx, obj): + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it has a related snapshot DataUpload.") + ctx.pvsToProvision.Insert(name) + + if ready, err := ctx.featureVerifier.Verify(velerov1api.CSIFeatureFlag); !ready { + ctx.log.Errorf("Failed to verify CSI modules, ready %v, err %v", ready, err) + errs.Add(namespace, fmt.Errorf("CSI modules are not ready for restore. Check CSI feature is enabled and CSI plugin is installed")) + } + + // Return early because we don't want to restore the PV itself, we + // want to dynamically re-provision it. + return warnings, errs, itemExists + case hasDeleteReclaimPolicy(obj.Object): - ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.") ctx.pvsToProvision.Insert(name) // Return early because we don't want to restore the PV itself, we @@ -1317,7 +1354,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso return warnings, errs, itemExists default: - ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") + ctx.log.WithFields(logrus.Fields{ + "namespace": obj.GetNamespace(), + "name": obj.GetName(), + "groupResource": groupResource.String(), + }).Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.") // Check to see if the claimRef.namespace field needs to be remapped, and do so if necessary. _, err = remapClaimRefNS(ctx, obj) @@ -1952,6 +1993,18 @@ func hasSnapshot(pvName string, snapshots []*volume.Snapshot) bool { } func hasCSIVolumeSnapshot(ctx *restoreContext, unstructuredPV *unstructured.Unstructured) bool { + pv := new(v1.PersistentVolume) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.Object, pv); err != nil { + ctx.log.WithError(err).Warnf("Unable to convert PV from unstructured to structured") + return false + } + + for _, vs := range ctx.csiVolumeSnapshots { + if pv.Spec.ClaimRef.Name == *vs.Spec.Source.PersistentVolumeClaimName && + pv.Spec.ClaimRef.Namespace == vs.Namespace { + return true + } + } return false } diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index 4ffd76257..643d001af 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -2256,6 +2257,7 @@ func (*volumeSnapshotter) DeleteSnapshot(snapshotID string) error { // Verification is done by looking at the contents of the API and the metadata/spec/status of // the items in the API. func TestRestorePersistentVolumes(t *testing.T) { + testPVCName := "testPVC" tests := []struct { name string restore *velerov1api.Restore @@ -2265,6 +2267,8 @@ func TestRestorePersistentVolumes(t *testing.T) { volumeSnapshots []*volume.Snapshot volumeSnapshotLocations []*velerov1api.VolumeSnapshotLocation volumeSnapshotterGetter volumeSnapshotterGetter + csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot + dataUploadResult *corev1api.ConfigMap want []*test.APIResource wantError bool wantWarning bool @@ -2923,6 +2927,77 @@ func TestRestorePersistentVolumes(t *testing.T) { ), }, }, + { + name: "when a PV with a reclaim policy of retain has a CSI VolumeSnapshot and does not exist in-cluster, the PV is not restored", + restore: defaultRestore().Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("persistentvolumes", + builder.ForPersistentVolume("pv-1"). + ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain). + ClaimRef("velero", testPVCName). + Result(), + ). + Done(), + apiResources: []*test.APIResource{ + test.PVs(), + test.PVCs(), + }, + csiVolumeSnapshots: []*snapshotv1api.VolumeSnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test", + }, + Spec: snapshotv1api.VolumeSnapshotSpec{ + Source: snapshotv1api.VolumeSnapshotSource{ + PersistentVolumeClaimName: &testPVCName, + }, + }, + }, + }, + volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{ + builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(), + }, + volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{ + "provider-1": &volumeSnapshotter{ + snapshotVolumes: map[string]string{"snapshot-1": "new-volume"}, + }, + }, + want: []*test.APIResource{}, + }, + { + name: "when a PV with a reclaim policy of retain has a DataUpload result CM and does not exist in-cluster, the PV is not restored", + restore: defaultRestore().ObjectMeta(builder.WithUID("fakeUID")).Result(), + backup: defaultBackup().Result(), + tarball: test.NewTarWriter(t). + AddItems("persistentvolumes", + builder.ForPersistentVolume("pv-1"). + ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain). + ClaimRef("velero", testPVCName). + Result(), + ). + Done(), + apiResources: []*test.APIResource{ + test.PVs(), + test.PVCs(), + test.ConfigMaps(), + }, + volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{ + builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(), + }, + volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{ + "provider-1": &volumeSnapshotter{ + snapshotVolumes: map[string]string{"snapshot-1": "new-volume"}, + }, + }, + dataUploadResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{ + velerov1api.RestoreUIDLabel: "fakeUID", + velerov1api.PVCNamespaceNameLabel: "velero/testPVC", + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + })).Result(), + want: []*test.APIResource{}, + }, } for _, tc := range tests { @@ -2939,6 +3014,10 @@ func TestRestorePersistentVolumes(t *testing.T) { require.NoError(t, h.restorer.kbClient.Create(context.Background(), vsl)) } + if tc.dataUploadResult != nil { + require.NoError(t, h.restorer.kbClient.Create(context.TODO(), tc.dataUploadResult)) + } + for _, r := range tc.apiResources { h.AddItems(t, r) } @@ -2955,11 +3034,12 @@ func TestRestorePersistentVolumes(t *testing.T) { } data := &Request{ - Log: h.log, - Restore: tc.restore, - Backup: tc.backup, - VolumeSnapshots: tc.volumeSnapshots, - BackupReader: tc.tarball, + Log: h.log, + Restore: tc.restore, + Backup: tc.backup, + VolumeSnapshots: tc.volumeSnapshots, + BackupReader: tc.tarball, + CSIVolumeSnapshots: tc.csiVolumeSnapshots, } warnings, errs := h.restorer.Restore( data, @@ -3652,3 +3732,175 @@ func TestIsAlreadyExistsError(t *testing.T) { }) } } + +func TestHasCSIVolumeSnapshot(t *testing.T) { + tests := []struct { + name string + vs *snapshotv1api.VolumeSnapshot + obj *unstructured.Unstructured + expectedResult bool + }{ + { + name: "Invalid PV, expect false.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": 1, + }, + }, + expectedResult: false, + }, + { + name: "Cannot find VS, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + }, + }, + expectedResult: false, + }, + { + name: "Find VS, expect true.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "velero", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "test", + }, + }, + }, + }, + vs: builder.ForVolumeSnapshot("velero", "test").SourcePVC("test").Result(), + expectedResult: true, + }, + } + + for _, tc := range tests { + h := newHarness(t) + + ctx := &restoreContext{ + log: h.log, + } + + if tc.vs != nil { + ctx.csiVolumeSnapshots = []*snapshotv1api.VolumeSnapshot{tc.vs} + } + + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedResult, hasCSIVolumeSnapshot(ctx, tc.obj)) + }) + } +} + +func TestHasSnapshotDataUpload(t *testing.T) { + tests := []struct { + name string + duResult *corev1api.ConfigMap + obj *unstructured.Unstructured + expectedResult bool + restore *velerov1api.Restore + }{ + { + name: "Invalid PV, expect false.", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": 1, + }, + }, + expectedResult: false, + }, + { + name: "PV without ClaimRef, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + { + name: "Cannot find DataUploadResult CM, expect false", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "testPVC", + }, + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + { + name: "Find DataUploadResult CM, expect true", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "PersistentVolume", + "apiVersion": "v1", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": "test", + }, + "spec": map[string]interface{}{ + "claimRef": map[string]interface{}{ + "namespace": "velero", + "name": "testPVC", + }, + }, + }, + }, + duResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{ + velerov1api.RestoreUIDLabel: "fakeUID", + velerov1api.PVCNamespaceNameLabel: "velero/testPVC", + velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult), + })).Result(), + restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(), + expectedResult: false, + }, + } + + for _, tc := range tests { + h := newHarness(t) + + ctx := &restoreContext{ + log: h.log, + kbClient: h.restorer.kbClient, + restore: tc.restore, + } + + if tc.duResult != nil { + require.NoError(t, ctx.kbClient.Create(context.TODO(), tc.duResult)) + } + + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expectedResult, hasSnapshotDataUpload(ctx, tc.obj)) + }) + } +} diff --git a/pkg/test/resources.go b/pkg/test/resources.go index 7c2fa17f6..709497fca 100644 --- a/pkg/test/resources.go +++ b/pkg/test/resources.go @@ -142,6 +142,17 @@ func ServiceAccounts(items ...metav1.Object) *APIResource { } } +func ConfigMaps(items ...metav1.Object) *APIResource { + return &APIResource{ + Group: "", + Version: "v1", + Name: "configmaps", + ShortName: "cm", + Namespaced: true, + Items: items, + } +} + func CRDs(items ...metav1.Object) *APIResource { return &APIResource{ Group: "apiextensions.k8s.io", diff --git a/pkg/test/test_logger.go b/pkg/test/test_logger.go index d8095a79d..b890fd5da 100644 --- a/pkg/test/test_logger.go +++ b/pkg/test/test_logger.go @@ -40,7 +40,7 @@ type singleLogRecorder struct { } func (s *singleLogRecorder) Write(p []byte) (n int, err error) { - *s.buffer = string(p[:]) + *s.buffer = *s.buffer + string(p[:]) return len(p), nil } diff --git a/site/content/docs/main/api-types/schedule.md b/site/content/docs/main/api-types/schedule.md index c2c55f6b7..eb8e8fbd8 100644 --- a/site/content/docs/main/api-types/schedule.md +++ b/site/content/docs/main/api-types/schedule.md @@ -41,6 +41,11 @@ spec: # CSI VolumeSnapshot status turns to ReadyToUse during creation, before # returning error as timeout. The default value is 10 minute. csiSnapshotTimeout: 10m + # resourcePolicy specifies the referenced resource policies that backup should follow + # optional + resourcePolicy: + kind: configmap + name: resource-policy-configmap # Array of namespaces to include in the scheduled backup. If unspecified, all namespaces are included. # Optional. includedNamespaces: diff --git a/site/content/docs/v1.12/api-types/schedule.md b/site/content/docs/v1.12/api-types/schedule.md index c2c55f6b7..eb8e8fbd8 100644 --- a/site/content/docs/v1.12/api-types/schedule.md +++ b/site/content/docs/v1.12/api-types/schedule.md @@ -41,6 +41,11 @@ spec: # CSI VolumeSnapshot status turns to ReadyToUse during creation, before # returning error as timeout. The default value is 10 minute. csiSnapshotTimeout: 10m + # resourcePolicy specifies the referenced resource policies that backup should follow + # optional + resourcePolicy: + kind: configmap + name: resource-policy-configmap # Array of namespaces to include in the scheduled backup. If unspecified, all namespaces are included. # Optional. includedNamespaces: