Merge branch 'main' into issue-fix-6693

This commit is contained in:
Lyndon-Li
2023-11-10 10:04:27 +08:00
27 changed files with 1140 additions and 51 deletions

View File

@@ -0,0 +1 @@
Add the implementation for design #6950, configurable data path concurrency

View File

@@ -0,0 +1 @@
Add DataUpload Result and CSI VolumeSnapshot check for restore PV.

View File

@@ -0,0 +1 @@
Truncate the credential file to avoid the change of secret content messing it up

View File

@@ -28,10 +28,15 @@ Therefore, in order to gain the optimized performance with the limited resources
We introduce a configMap named ```node-agent-configs``` for users to specify the node-agent related configurations. This configMap is not created by Velero, users should create it manually on demand. The configMap should be in the same namespace where Velero is installed. If multiple Velero instances are installed in different namespaces, there should be one configMap in each namespace which applies to node-agent in that namespace only.
Node-agent server checks these configurations at startup time and use it to initiate the related VGDP modules. Therefore, users could edit this configMap any time, but in order to make the changes effective, node-agent server needs to be restarted.
The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```data-path-concurrency```.
The ```node-agent-configs``` configMap may be used for other purpose of configuring node-agent in future, at present, there is only one kind of configuration as the data in the configMap, the name is ```dataPathConcurrency```.
The data structure for ```data-path-concurrency``` is as below:
The data structure for ```node-agent-configs``` is as below:
```go
type Configs struct {
// DataPathConcurrency is the config for data path concurrency per node.
DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"`
}
type DataPathConcurrency struct {
// GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified
GlobalConfig int `json:"globalConfig,omitempty"`
@@ -50,7 +55,7 @@ type RuledConfigs struct {
```
### Global concurrent number
We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```data-path-concurrency```.
We allow users to specify a concurrent number that will be applied to all nodes if the per-node number is not specified. This number is set through ```globalConfig``` field in ```dataPathConcurrency```.
The number starts from 1 which means there is no concurrency, only one instance of VGDP is allowed. There is no roof limit.
If this number is not specified or not valid, a hard-coded default value will be used, the value is set to 1.
@@ -62,7 +67,7 @@ We allow users to specify different concurrent number per node, for example, use
The range of Per-node concurrent number is the same with Global concurrent number.
Per-node concurrent number is preferable to Global concurrent number, so it will overwrite the Global concurrent number for that node.
Per-node concurrent number is implemented through ```perNodeConfig``` field in ```data-path-concurrency```.
Per-node concurrent number is implemented through ```perNodeConfig``` field in ```dataPathConcurrency```.
```perNodeConfig``` is a list of ```RuledConfigs``` each item of which matches one or more nodes by label selectors and specify the concurrent number for the matched nodes. This means, the nodes are identified by labels.
@@ -80,30 +85,32 @@ If one node falls into more than one rules, e.g., if node1 also has the label ``
A sample of the ```node-agent-configs``` configMap is as below:
```json
{
"globalConfig": 2,
"perNodeConfig": [
{
"nodeSelector": {
"matchLabels": {
"kubernetes.io/hostname": "node1"
}
"dataPathConcurrency": {
"globalConfig": 2,
"perNodeConfig": [
{
"nodeSelector": {
"matchLabels": {
"kubernetes.io/hostname": "node1"
}
},
"number": 3
},
"number": 3
},
{
"nodeSelector": {
"matchLabels": {
"beta.kubernetes.io/instance-type": "Standard_B4ms"
}
},
"number": 5
}
]
{
"nodeSelector": {
"matchLabels": {
"beta.kubernetes.io/instance-type": "Standard_B4ms"
}
},
"number": 5
}
]
}
}
```
To create the configMap, users need to save something like the above sample to a json file and then run below command:
```
kubectl create cm node-agent-configs -n velero --from-file=data-path-concurrency=<json file name>
kubectl create cm node-agent-configs -n velero --from-file=<json file name>
```
### Global data path manager

View File

@@ -71,7 +71,7 @@ func (n *namespacedFileStore) Path(selector *corev1api.SecretKeySelector) (strin
keyFilePath := filepath.Join(n.fsRoot, fmt.Sprintf("%s-%s", selector.Name, selector.Key))
file, err := n.fs.OpenFile(keyFilePath, os.O_RDWR|os.O_CREATE, 0644)
file, err := n.fs.OpenFile(keyFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return "", errors.Wrap(err, "unable to open credentials file for writing")
}

View File

@@ -41,6 +41,11 @@ func ForNode(name string) *NodeBuilder {
}
}
func (b *NodeBuilder) Labels(labels map[string]string) *NodeBuilder {
b.object.Labels = labels
return b
}
// Result returns the built Node.
func (b *NodeBuilder) Result() *corev1api.Node {
return b.object

View File

@@ -101,3 +101,8 @@ func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.Container
}
return b
}
func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder {
b.object.Status.Phase = phase
return b
}

View File

@@ -67,3 +67,8 @@ func (v *VolumeSnapshotBuilder) BoundVolumeSnapshotContentName(vscName string) *
v.object.Status.BoundVolumeSnapshotContentName = &vscName
return v
}
func (v *VolumeSnapshotBuilder) SourcePVC(name string) *VolumeSnapshotBuilder {
v.object.Spec.Source.PersistentVolumeClaimName = &name
return v
}

View File

@@ -19,6 +19,7 @@ package nodeagent
import (
"context"
"fmt"
"math"
"net/http"
"os"
"strings"
@@ -32,6 +33,7 @@ import (
storagev1api "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@@ -53,7 +55,9 @@ import (
"github.com/vmware-tanzu/velero/pkg/cmd"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
@@ -73,6 +77,7 @@ const (
defaultResourceTimeout = 10 * time.Minute
defaultDataMoverPrepareTimeout = 30 * time.Minute
defaultDataPathConcurrentNum = 1
)
type nodeAgentServerConfig struct {
@@ -132,6 +137,7 @@ type nodeAgentServer struct {
config nodeAgentServerConfig
kubeClient kubernetes.Interface
csiSnapshotClient *snapshotv1client.Clientset
dataPathMgr *datapath.Manager
}
func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
@@ -219,6 +225,10 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
if err != nil {
return nil, err
}
dataPathConcurrentNum := s.getDataPathConcurrentNum(defaultDataPathConcurrentNum)
s.dataPathMgr = datapath.NewManager(dataPathConcurrentNum)
return s, nil
}
@@ -263,24 +273,24 @@ func (s *nodeAgentServer) run() {
credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), repoEnsurer,
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer,
credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger)
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup)
}
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), s.dataPathMgr, repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataUploadsCancel(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataDownloadsCancel(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
@@ -478,3 +488,65 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
s.logger.WithField("podvolumerestore", pvr.GetName()).Warn(pvr.Status.Message)
}
}
var getConfigsFunc = nodeagent.GetConfigs
func (s *nodeAgentServer) getDataPathConcurrentNum(defaultNum int) int {
configs, err := getConfigsFunc(s.ctx, s.namespace, s.kubeClient)
if err != nil {
s.logger.WithError(err).Warn("Failed to get node agent configs")
return defaultNum
}
if configs == nil || configs.DataPathConcurrency == nil {
s.logger.Infof("Concurrency configs are not found, use the default number %v", defaultNum)
return defaultNum
}
globalNum := configs.DataPathConcurrency.GlobalConfig
if globalNum <= 0 {
s.logger.Warnf("Global number %v is invalid, use the default value %v", globalNum, defaultNum)
globalNum = defaultNum
}
if len(configs.DataPathConcurrency.PerNodeConfig) == 0 {
return globalNum
}
curNode, err := s.kubeClient.CoreV1().Nodes().Get(s.ctx, s.nodeName, metav1.GetOptions{})
if err != nil {
s.logger.WithError(err).Warnf("Failed to get node info for %s, use the global number %v", s.nodeName, globalNum)
return globalNum
}
concurrentNum := math.MaxInt32
for _, rule := range configs.DataPathConcurrency.PerNodeConfig {
selector, err := metav1.LabelSelectorAsSelector(&rule.NodeSelector)
if err != nil {
s.logger.WithError(err).Warnf("Failed to parse rule with label selector %s, skip it", rule.NodeSelector.String())
continue
}
if rule.Number <= 0 {
s.logger.Warnf("Rule with label selector %s is with an invalid number %v, skip it", rule.NodeSelector.String(), rule.Number)
continue
}
if selector.Matches(labels.Set(curNode.GetLabels())) {
if concurrentNum > rule.Number {
concurrentNum = rule.Number
}
}
}
if concurrentNum == math.MaxInt32 {
s.logger.Infof("Per node number for node %s is not found, use the global number %v", s.nodeName, globalNum)
concurrentNum = globalNum
} else {
s.logger.Infof("Use the per node number %v over global number %v for node %s", concurrentNum, globalNum, s.nodeName)
}
return concurrentNum
}

View File

@@ -17,16 +17,22 @@ package nodeagent
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
testutil "github.com/vmware-tanzu/velero/pkg/test"
)
@@ -107,3 +113,259 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
})
}
}
func Test_getDataPathConcurrentNum(t *testing.T) {
defaultNum := 100001
globalNum := 6
nodeName := "node-agent-node"
node1 := builder.ForNode("node-agent-node").Result()
node2 := builder.ForNode("node-agent-node").Labels(map[string]string{
"host-name": "node-1",
"xxxx": "yyyyy",
}).Result()
invalidLabelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
"inva/lid": "inva/lid",
},
}
validLabelSelector1 := metav1.LabelSelector{
MatchLabels: map[string]string{
"host-name": "node-1",
},
}
validLabelSelector2 := metav1.LabelSelector{
MatchLabels: map[string]string{
"xxxx": "yyyyy",
},
}
tests := []struct {
name string
getFunc func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error)
setKubeClient bool
kubeClientObj []runtime.Object
expectNum int
expectLog string
}{
{
name: "failed to get configs",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, errors.New("fake-get-error")
},
expectLog: "Failed to get node agent configs",
expectNum: defaultNum,
},
{
name: "configs cm not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return nil, nil
},
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "configs cm's data path concurrency is nil",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{}, nil
},
expectLog: fmt.Sprintf("Concurrency configs are not found, use the default number %v", defaultNum),
expectNum: defaultNum,
},
{
name: "global number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: -1,
},
}, nil
},
expectLog: fmt.Sprintf("Global number %v is invalid, use the default value %v", -1, defaultNum),
expectNum: defaultNum,
},
{
name: "global number is valid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
},
}, nil
},
expectNum: globalNum,
},
{
name: "node is not found",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
Number: 100,
},
},
},
}, nil
},
setKubeClient: true,
expectLog: fmt.Sprintf("Failed to get node info for %s, use the global number %v", nodeName, globalNum),
expectNum: globalNum,
},
{
name: "failed to get selector",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: invalidLabelSelector,
Number: 100,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
expectLog: fmt.Sprintf("Failed to parse rule with label selector %s, skip it", invalidLabelSelector.String()),
expectNum: globalNum,
},
{
name: "rule number is invalid",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
expectLog: fmt.Sprintf("Rule with label selector %s is with an invalid number %v, skip it", validLabelSelector1.String(), -1),
expectNum: globalNum,
},
{
name: "label doesn't match",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: -1,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node1},
expectLog: fmt.Sprintf("Per node number for node %s is not found, use the global number %v", nodeName, globalNum),
expectNum: globalNum,
},
{
name: "match one rule",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 66, globalNum, nodeName),
expectNum: 66,
},
{
name: "match multiple rules",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 66,
},
{
NodeSelector: validLabelSelector2,
Number: 36,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName),
expectNum: 36,
},
{
name: "match multiple rules 2",
getFunc: func(context.Context, string, kubernetes.Interface) (*nodeagent.Configs, error) {
return &nodeagent.Configs{
DataPathConcurrency: &nodeagent.DataPathConcurrency{
GlobalConfig: globalNum,
PerNodeConfig: []nodeagent.RuledConfigs{
{
NodeSelector: validLabelSelector1,
Number: 36,
},
{
NodeSelector: validLabelSelector2,
Number: 66,
},
},
},
}, nil
},
setKubeClient: true,
kubeClientObj: []runtime.Object{node2},
expectLog: fmt.Sprintf("Use the per node number %v over global number %v for node %s", 36, globalNum, nodeName),
expectNum: 36,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
logBuffer := ""
s := &nodeAgentServer{
nodeName: nodeName,
logger: testutil.NewSingleLogger(&logBuffer),
}
if test.setKubeClient {
s.kubeClient = fakeKubeClient
}
getConfigsFunc = test.getFunc
num := s.getDataPathConcurrentNum(defaultNum)
assert.Equal(t, test.expectNum, num)
if test.expectLog == "" {
assert.Equal(t, "", logBuffer)
} else {
assert.True(t, strings.Contains(logBuffer, test.expectLog))
}
})
}
}

View File

@@ -69,7 +69,7 @@ type DataDownloadReconciler struct {
metrics *metrics.ServerMetrics
}
func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
@@ -81,7 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
nodeName: nodeName,
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: datapath.NewManager(1),
dataPathMgr: dataPathMgr,
preparingTimeout: preparingTimeout,
metrics: metrics,
}

View File

@@ -142,7 +142,10 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
if err != nil {
return nil, err
}
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
dataPathMgr := datapath.NewManager(1)
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}
func TestDataDownloadReconcile(t *testing.T) {

View File

@@ -79,8 +79,8 @@ type DataUploadReconciler struct {
metrics *metrics.ServerMetrics
}
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface,
csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
@@ -93,7 +93,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
logger: log,
repoEnsurer: repoEnsurer,
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)},
dataPathMgr: datapath.NewManager(1),
dataPathMgr: dataPathMgr,
preparingTimeout: preparingTimeout,
metrics: metrics,
}

View File

@@ -162,6 +162,8 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
Spec: appsv1.DaemonSetSpec{},
}
dataPathMgr := datapath.NewManager(1)
now, err := time.Parse(time.RFC1123, time.RFC1123)
if err != nil {
return nil, err
@@ -217,7 +219,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
if err != nil {
return nil, err
}
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil,
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil,
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

View File

@@ -47,7 +47,7 @@ import (
const pVBRRequestor string = "pod-volume-backup-restore"
// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance
func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,
func NewPodVolumeBackupReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter,
nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler {
return &PodVolumeBackupReconciler{
Client: client,
@@ -59,7 +59,7 @@ func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensu
clock: &clocks.RealClock{},
scheme: scheme,
metrics: metrics,
dataPathMgr: datapath.NewManager(1),
dataPathMgr: dataPathMgr,
}
}

View File

@@ -50,7 +50,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ensurer,
func NewPodVolumeRestoreReconciler(client client.Client, dataPathMgr *datapath.Manager, ensurer *repository.Ensurer,
credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler {
return &PodVolumeRestoreReconciler{
Client: client,
@@ -59,7 +59,7 @@ func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ens
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
clock: &clocks.RealClock{},
dataPathMgr: datapath.NewManager(1),
dataPathMgr: dataPathMgr,
}
}

View File

@@ -515,6 +515,11 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
return errors.Wrap(err, "error fetching volume snapshots metadata")
}
csiVolumeSnapshots, err := backupStore.GetCSIVolumeSnapshots(restore.Spec.BackupName)
if err != nil {
return errors.Wrap(err, "fail to fetch CSI VolumeSnapshots metadata")
}
restoreLog.Info("starting restore")
var podVolumeBackups []*api.PodVolumeBackup
@@ -531,6 +536,7 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
BackupReader: backupFile,
ResourceModifiers: resourceModifiers,
DisableInformerCache: r.disableInformerCache,
CSIVolumeSnapshots: csiVolumeSnapshots,
}
restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager)

View File

@@ -23,6 +23,7 @@ import (
"testing"
"time"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@@ -471,6 +472,7 @@ func TestRestoreReconcile(t *testing.T) {
}
if test.expectedRestorerCall != nil {
backupStore.On("GetBackupContents", test.backup.Name).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil)
backupStore.On("GetCSIVolumeSnapshots", test.backup.Name).Return([]*snapshotv1api.VolumeSnapshot{}, nil)
restorer.On("RestoreWithResolvers", mock.Anything, mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors)

View File

@@ -18,6 +18,7 @@ package nodeagent
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
@@ -33,13 +34,36 @@ import (
const (
// daemonSet is the name of the Velero node agent daemonset.
daemonSet = "node-agent"
daemonSet = "node-agent"
configName = "node-agent-configs"
dataPathConConfigName = "data-path-concurrency"
)
var (
ErrDaemonSetNotFound = errors.New("daemonset not found")
)
type DataPathConcurrency struct {
// GlobalConfig specifies the concurrency number to all nodes for which per-node config is not specified
GlobalConfig int `json:"globalConfig,omitempty"`
// PerNodeConfig specifies the concurrency number to nodes matched by rules
PerNodeConfig []RuledConfigs `json:"perNodeConfig,omitempty"`
}
type RuledConfigs struct {
// NodeSelector specifies the label selector to match nodes
NodeSelector metav1.LabelSelector `json:"nodeSelector"`
// Number specifies the number value associated to the matched nodes
Number int `json:"number"`
}
type Configs struct {
// DataPathConcurrency is the config for data path concurrency per node.
DataPathConcurrency *DataPathConcurrency `json:"dataPathConcurrency,omitempty"`
}
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found
func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error {
if _, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
@@ -89,3 +113,31 @@ func GetPodSpec(ctx context.Context, kubeClient kubernetes.Interface, namespace
return &ds.Spec.Template.Spec, nil
}
func GetConfigs(ctx context.Context, namespace string, kubeClient kubernetes.Interface) (*Configs, error) {
cm, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(ctx, configName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
} else {
return nil, errors.Wrapf(err, "error to get node agent configs %s", configName)
}
}
if cm.Data == nil {
return nil, errors.Errorf("data is not available in config map %s", configName)
}
jsonString := ""
for _, v := range cm.Data {
jsonString = v
}
configs := &Configs{}
err = json.Unmarshal([]byte(jsonString), configs)
if err != nil {
return nil, errors.Wrapf(err, "error to unmarshall configs from %s", configName)
}
return configs, nil
}

View File

@@ -0,0 +1,337 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodeagent
import (
"context"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
clientTesting "k8s.io/client-go/testing"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/vmware-tanzu/velero/pkg/builder"
)
type reactor struct {
verb string
resource string
reactorFunc clientTesting.ReactionFunc
}
func TestIsRunning(t *testing.T) {
daemonSet := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
}
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
kubeReactors []reactor
expectErr string
}{
{
name: "ds is not found",
namespace: "fake-ns",
expectErr: "daemonset not found",
},
{
name: "ds get error",
namespace: "fake-ns",
kubeReactors: []reactor{
{
verb: "get",
resource: "daemonsets",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
expectErr: "fake-get-error",
},
{
name: "succeed",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
daemonSet,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
err := IsRunning(context.TODO(), fakeKubeClient, test.namespace)
if test.expectErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}
func TestIsRunningInNode(t *testing.T) {
scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)
nonNodeAgentPod := builder.ForPod("fake-ns", "fake-pod").Result()
nodeAgentPodNotRunning := builder.ForPod("fake-ns", "fake-pod").Labels(map[string]string{"name": "node-agent"}).Result()
nodeAgentPodRunning1 := builder.ForPod("fake-ns", "fake-pod-1").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result()
nodeAgentPodRunning2 := builder.ForPod("fake-ns", "fake-pod-2").Labels(map[string]string{"name": "node-agent"}).Phase(corev1.PodRunning).Result()
nodeAgentPodRunning3 := builder.ForPod("fake-ns", "fake-pod-3").
Labels(map[string]string{"name": "node-agent"}).
Phase(corev1.PodRunning).
NodeName("fake-node").
Result()
tests := []struct {
name string
kubeClientObj []runtime.Object
nodeName string
expectErr string
}{
{
name: "node name is empty",
expectErr: "node name is empty",
},
{
name: "ds pod not found",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nonNodeAgentPod,
},
expectErr: "daemonset pod not found in running state in node fake-node",
},
{
name: "ds po are not all running",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nodeAgentPodNotRunning,
nodeAgentPodRunning1,
},
expectErr: "daemonset pod not found in running state in node fake-node",
},
{
name: "ds pods wrong node name",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nodeAgentPodNotRunning,
nodeAgentPodRunning1,
nodeAgentPodRunning2,
},
expectErr: "daemonset pod not found in running state in node fake-node",
},
{
name: "succeed",
nodeName: "fake-node",
kubeClientObj: []runtime.Object{
nodeAgentPodNotRunning,
nodeAgentPodRunning1,
nodeAgentPodRunning2,
nodeAgentPodRunning3,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
err := IsRunningInNode(context.TODO(), "", test.nodeName, fakeClient)
if test.expectErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}
func TestGetPodSpec(t *testing.T) {
podSpec := corev1.PodSpec{
NodeName: "fake-node",
}
daemonSet := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "node-agent",
},
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
},
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
Spec: podSpec,
},
},
}
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
expectErr string
expectSpec corev1.PodSpec
}{
{
name: "ds is not found",
namespace: "fake-ns",
expectErr: "error to get node-agent daemonset: daemonsets.apps \"node-agent\" not found",
},
{
name: "succeed",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
daemonSet,
},
expectSpec: podSpec,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
spec, err := GetPodSpec(context.TODO(), fakeKubeClient, test.namespace)
if test.expectErr == "" {
assert.NoError(t, err)
assert.Equal(t, *spec, test.expectSpec)
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}
func TestGetConfigs(t *testing.T) {
cm := builder.ForConfigMap("fake-ns", "node-agent-configs").Result()
cmWithInvalidDataFormat := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "wrong").Result()
cmWithoutCocurrentData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"someothers\":{\"someother\": 10}}").Result()
cmWithValidData := builder.ForConfigMap("fake-ns", "node-agent-configs").Data("fake-key", "{\"dataPathConcurrency\":{\"globalConfig\": 5}}").Result()
tests := []struct {
name string
kubeClientObj []runtime.Object
namespace string
kubeReactors []reactor
expectResult *Configs
expectErr string
}{
{
name: "cm is not found",
namespace: "fake-ns",
},
{
name: "cm get error",
namespace: "fake-ns",
kubeReactors: []reactor{
{
verb: "get",
resource: "configmaps",
reactorFunc: func(action clientTesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("fake-get-error")
},
},
},
expectErr: "error to get node agent configs node-agent-configs: fake-get-error",
},
{
name: "cm's data is nil",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cm,
},
expectErr: "data is not available in config map node-agent-configs",
},
{
name: "cm's data is with invalid format",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithInvalidDataFormat,
},
expectErr: "error to unmarshall configs from node-agent-configs: invalid character 'w' looking for beginning of value",
},
{
name: "concurrency configs are not found",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithoutCocurrentData,
},
expectResult: &Configs{nil},
},
{
name: "success",
namespace: "fake-ns",
kubeClientObj: []runtime.Object{
cmWithValidData,
},
expectResult: &Configs{
DataPathConcurrency: &DataPathConcurrency{
GlobalConfig: 5,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
for _, reactor := range test.kubeReactors {
fakeKubeClient.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorFunc)
}
result, err := GetConfigs(context.TODO(), test.namespace, fakeKubeClient)
if test.expectErr == "" {
assert.NoError(t, err)
if test.expectResult == nil {
assert.Nil(t, result)
} else if test.expectResult.DataPathConcurrency == nil {
assert.Nil(t, result.DataPathConcurrency)
} else {
assert.Equal(t, *test.expectResult.DataPathConcurrency, *result.DataPathConcurrency)
}
} else {
assert.EqualError(t, err, test.expectErr)
}
})
}
}

View File

@@ -21,6 +21,7 @@ import (
"io"
"sort"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime"
@@ -60,6 +61,7 @@ type Request struct {
itemOperationsList *[]*itemoperation.RestoreOperation
ResourceModifiers *resourcemodifiers.ResourceModifiers
DisableInformerCache bool
CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot
}
type restoredItemStatus struct {

View File

@@ -30,6 +30,7 @@ import (
"time"
"github.com/google/uuid"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
@@ -302,6 +303,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
volumeSnapshots: req.VolumeSnapshots,
csiVolumeSnapshots: req.CSIVolumeSnapshots,
podVolumeBackups: req.PodVolumeBackups,
resourceTerminatingTimeout: kr.resourceTerminatingTimeout,
resourceTimeout: kr.resourceTimeout,
@@ -352,6 +354,7 @@ type restoreContext struct {
pvsToProvision sets.String
pvRestorer PVRestorer
volumeSnapshots []*volume.Snapshot
csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot
podVolumeBackups []*velerov1api.PodVolumeBackup
resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration
@@ -1293,7 +1296,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}
case hasPodVolumeBackup(obj, ctx):
ctx.log.Infof("Dynamically re-provisioning persistent volume because it has a pod volume backup to be restored.")
ctx.log.WithFields(logrus.Fields{
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Infof("Dynamically re-provisioning persistent volume because it has a pod volume backup to be restored.")
ctx.pvsToProvision.Insert(name)
// Return early because we don't want to restore the PV itself, we
@@ -1301,15 +1308,45 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs, itemExists
case hasCSIVolumeSnapshot(ctx, obj):
case hasSnapshotDataUpload(ctx, obj):
ctx.log.WithFields(logrus.Fields{
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Infof("Dynamically re-provisioning persistent volume because it has a related CSI VolumeSnapshot.")
ctx.pvsToProvision.Insert(name)
if ready, err := ctx.featureVerifier.Verify(velerov1api.CSIFeatureFlag); !ready {
ctx.log.Errorf("Failed to verify CSI modules, ready %v, err %v", ready, err)
errs.Add(namespace, fmt.Errorf("CSI modules are not ready for restore. Check CSI feature is enabled and CSI plugin is installed"))
return warnings, errs, itemExists
}
// Return early because we don't want to restore the PV itself, we
// want to dynamically re-provision it.
return warnings, errs, itemExists
case hasSnapshotDataUpload(ctx, obj):
ctx.log.WithFields(logrus.Fields{
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Infof("Dynamically re-provisioning persistent volume because it has a related snapshot DataUpload.")
ctx.pvsToProvision.Insert(name)
if ready, err := ctx.featureVerifier.Verify(velerov1api.CSIFeatureFlag); !ready {
ctx.log.Errorf("Failed to verify CSI modules, ready %v, err %v", ready, err)
errs.Add(namespace, fmt.Errorf("CSI modules are not ready for restore. Check CSI feature is enabled and CSI plugin is installed"))
}
// Return early because we don't want to restore the PV itself, we
// want to dynamically re-provision it.
return warnings, errs, itemExists
case hasDeleteReclaimPolicy(obj.Object):
ctx.log.Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.log.WithFields(logrus.Fields{
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Infof("Dynamically re-provisioning persistent volume because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.pvsToProvision.Insert(name)
// Return early because we don't want to restore the PV itself, we
@@ -1317,7 +1354,11 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
return warnings, errs, itemExists
default:
ctx.log.Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.")
ctx.log.WithFields(logrus.Fields{
"namespace": obj.GetNamespace(),
"name": obj.GetName(),
"groupResource": groupResource.String(),
}).Infof("Restoring persistent volume as-is because it doesn't have a snapshot and its reclaim policy is not Delete.")
// Check to see if the claimRef.namespace field needs to be remapped, and do so if necessary.
_, err = remapClaimRefNS(ctx, obj)
@@ -1952,6 +1993,18 @@ func hasSnapshot(pvName string, snapshots []*volume.Snapshot) bool {
}
func hasCSIVolumeSnapshot(ctx *restoreContext, unstructuredPV *unstructured.Unstructured) bool {
pv := new(v1.PersistentVolume)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPV.Object, pv); err != nil {
ctx.log.WithError(err).Warnf("Unable to convert PV from unstructured to structured")
return false
}
for _, vs := range ctx.csiVolumeSnapshots {
if pv.Spec.ClaimRef.Name == *vs.Spec.Source.PersistentVolumeClaimName &&
pv.Spec.ClaimRef.Namespace == vs.Namespace {
return true
}
}
return false
}

View File

@@ -25,6 +25,7 @@ import (
"testing"
"time"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@@ -2256,6 +2257,7 @@ func (*volumeSnapshotter) DeleteSnapshot(snapshotID string) error {
// Verification is done by looking at the contents of the API and the metadata/spec/status of
// the items in the API.
func TestRestorePersistentVolumes(t *testing.T) {
testPVCName := "testPVC"
tests := []struct {
name string
restore *velerov1api.Restore
@@ -2265,6 +2267,8 @@ func TestRestorePersistentVolumes(t *testing.T) {
volumeSnapshots []*volume.Snapshot
volumeSnapshotLocations []*velerov1api.VolumeSnapshotLocation
volumeSnapshotterGetter volumeSnapshotterGetter
csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot
dataUploadResult *corev1api.ConfigMap
want []*test.APIResource
wantError bool
wantWarning bool
@@ -2923,6 +2927,77 @@ func TestRestorePersistentVolumes(t *testing.T) {
),
},
},
{
name: "when a PV with a reclaim policy of retain has a CSI VolumeSnapshot and does not exist in-cluster, the PV is not restored",
restore: defaultRestore().Result(),
backup: defaultBackup().Result(),
tarball: test.NewTarWriter(t).
AddItems("persistentvolumes",
builder.ForPersistentVolume("pv-1").
ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).
ClaimRef("velero", testPVCName).
Result(),
).
Done(),
apiResources: []*test.APIResource{
test.PVs(),
test.PVCs(),
},
csiVolumeSnapshots: []*snapshotv1api.VolumeSnapshot{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test",
},
Spec: snapshotv1api.VolumeSnapshotSpec{
Source: snapshotv1api.VolumeSnapshotSource{
PersistentVolumeClaimName: &testPVCName,
},
},
},
},
volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{
builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(),
},
volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{
"provider-1": &volumeSnapshotter{
snapshotVolumes: map[string]string{"snapshot-1": "new-volume"},
},
},
want: []*test.APIResource{},
},
{
name: "when a PV with a reclaim policy of retain has a DataUpload result CM and does not exist in-cluster, the PV is not restored",
restore: defaultRestore().ObjectMeta(builder.WithUID("fakeUID")).Result(),
backup: defaultBackup().Result(),
tarball: test.NewTarWriter(t).
AddItems("persistentvolumes",
builder.ForPersistentVolume("pv-1").
ReclaimPolicy(corev1api.PersistentVolumeReclaimRetain).
ClaimRef("velero", testPVCName).
Result(),
).
Done(),
apiResources: []*test.APIResource{
test.PVs(),
test.PVCs(),
test.ConfigMaps(),
},
volumeSnapshotLocations: []*velerov1api.VolumeSnapshotLocation{
builder.ForVolumeSnapshotLocation(velerov1api.DefaultNamespace, "default").Provider("provider-1").Result(),
},
volumeSnapshotterGetter: map[string]vsv1.VolumeSnapshotter{
"provider-1": &volumeSnapshotter{
snapshotVolumes: map[string]string{"snapshot-1": "new-volume"},
},
},
dataUploadResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{
velerov1api.RestoreUIDLabel: "fakeUID",
velerov1api.PVCNamespaceNameLabel: "velero/testPVC",
velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult),
})).Result(),
want: []*test.APIResource{},
},
}
for _, tc := range tests {
@@ -2939,6 +3014,10 @@ func TestRestorePersistentVolumes(t *testing.T) {
require.NoError(t, h.restorer.kbClient.Create(context.Background(), vsl))
}
if tc.dataUploadResult != nil {
require.NoError(t, h.restorer.kbClient.Create(context.TODO(), tc.dataUploadResult))
}
for _, r := range tc.apiResources {
h.AddItems(t, r)
}
@@ -2955,11 +3034,12 @@ func TestRestorePersistentVolumes(t *testing.T) {
}
data := &Request{
Log: h.log,
Restore: tc.restore,
Backup: tc.backup,
VolumeSnapshots: tc.volumeSnapshots,
BackupReader: tc.tarball,
Log: h.log,
Restore: tc.restore,
Backup: tc.backup,
VolumeSnapshots: tc.volumeSnapshots,
BackupReader: tc.tarball,
CSIVolumeSnapshots: tc.csiVolumeSnapshots,
}
warnings, errs := h.restorer.Restore(
data,
@@ -3652,3 +3732,175 @@ func TestIsAlreadyExistsError(t *testing.T) {
})
}
}
func TestHasCSIVolumeSnapshot(t *testing.T) {
tests := []struct {
name string
vs *snapshotv1api.VolumeSnapshot
obj *unstructured.Unstructured
expectedResult bool
}{
{
name: "Invalid PV, expect false.",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": 1,
},
},
expectedResult: false,
},
{
name: "Cannot find VS, expect false",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "PersistentVolume",
"apiVersion": "v1",
"metadata": map[string]interface{}{
"namespace": "default",
"name": "test",
},
},
},
expectedResult: false,
},
{
name: "Find VS, expect true.",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "PersistentVolume",
"apiVersion": "v1",
"metadata": map[string]interface{}{
"namespace": "velero",
"name": "test",
},
"spec": map[string]interface{}{
"claimRef": map[string]interface{}{
"namespace": "velero",
"name": "test",
},
},
},
},
vs: builder.ForVolumeSnapshot("velero", "test").SourcePVC("test").Result(),
expectedResult: true,
},
}
for _, tc := range tests {
h := newHarness(t)
ctx := &restoreContext{
log: h.log,
}
if tc.vs != nil {
ctx.csiVolumeSnapshots = []*snapshotv1api.VolumeSnapshot{tc.vs}
}
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedResult, hasCSIVolumeSnapshot(ctx, tc.obj))
})
}
}
func TestHasSnapshotDataUpload(t *testing.T) {
tests := []struct {
name string
duResult *corev1api.ConfigMap
obj *unstructured.Unstructured
expectedResult bool
restore *velerov1api.Restore
}{
{
name: "Invalid PV, expect false.",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": 1,
},
},
expectedResult: false,
},
{
name: "PV without ClaimRef, expect false",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "PersistentVolume",
"apiVersion": "v1",
"metadata": map[string]interface{}{
"namespace": "default",
"name": "test",
},
},
},
duResult: builder.ForConfigMap("velero", "test").Result(),
restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(),
expectedResult: false,
},
{
name: "Cannot find DataUploadResult CM, expect false",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "PersistentVolume",
"apiVersion": "v1",
"metadata": map[string]interface{}{
"namespace": "default",
"name": "test",
},
"spec": map[string]interface{}{
"claimRef": map[string]interface{}{
"namespace": "velero",
"name": "testPVC",
},
},
},
},
duResult: builder.ForConfigMap("velero", "test").Result(),
restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(),
expectedResult: false,
},
{
name: "Find DataUploadResult CM, expect true",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "PersistentVolume",
"apiVersion": "v1",
"metadata": map[string]interface{}{
"namespace": "default",
"name": "test",
},
"spec": map[string]interface{}{
"claimRef": map[string]interface{}{
"namespace": "velero",
"name": "testPVC",
},
},
},
},
duResult: builder.ForConfigMap("velero", "test").ObjectMeta(builder.WithLabelsMap(map[string]string{
velerov1api.RestoreUIDLabel: "fakeUID",
velerov1api.PVCNamespaceNameLabel: "velero/testPVC",
velerov1api.ResourceUsageLabel: string(velerov1api.VeleroResourceUsageDataUploadResult),
})).Result(),
restore: builder.ForRestore("velero", "test").ObjectMeta(builder.WithUID("fakeUID")).Result(),
expectedResult: false,
},
}
for _, tc := range tests {
h := newHarness(t)
ctx := &restoreContext{
log: h.log,
kbClient: h.restorer.kbClient,
restore: tc.restore,
}
if tc.duResult != nil {
require.NoError(t, ctx.kbClient.Create(context.TODO(), tc.duResult))
}
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedResult, hasSnapshotDataUpload(ctx, tc.obj))
})
}
}

View File

@@ -142,6 +142,17 @@ func ServiceAccounts(items ...metav1.Object) *APIResource {
}
}
func ConfigMaps(items ...metav1.Object) *APIResource {
return &APIResource{
Group: "",
Version: "v1",
Name: "configmaps",
ShortName: "cm",
Namespaced: true,
Items: items,
}
}
func CRDs(items ...metav1.Object) *APIResource {
return &APIResource{
Group: "apiextensions.k8s.io",

View File

@@ -40,7 +40,7 @@ type singleLogRecorder struct {
}
func (s *singleLogRecorder) Write(p []byte) (n int, err error) {
*s.buffer = string(p[:])
*s.buffer = *s.buffer + string(p[:])
return len(p), nil
}

View File

@@ -41,6 +41,11 @@ spec:
# CSI VolumeSnapshot status turns to ReadyToUse during creation, before
# returning error as timeout. The default value is 10 minute.
csiSnapshotTimeout: 10m
# resourcePolicy specifies the referenced resource policies that backup should follow
# optional
resourcePolicy:
kind: configmap
name: resource-policy-configmap
# Array of namespaces to include in the scheduled backup. If unspecified, all namespaces are included.
# Optional.
includedNamespaces:

View File

@@ -41,6 +41,11 @@ spec:
# CSI VolumeSnapshot status turns to ReadyToUse during creation, before
# returning error as timeout. The default value is 10 minute.
csiSnapshotTimeout: 10m
# resourcePolicy specifies the referenced resource policies that backup should follow
# optional
resourcePolicy:
kind: configmap
name: resource-policy-configmap
# Array of namespaces to include in the scheduled backup. If unspecified, all namespaces are included.
# Optional.
includedNamespaces: