Merge branch 'main' of https://github.com/qiuming-best/velero into mark-crs-failed

This commit is contained in:
Ming
2023-07-11 02:09:48 +00:00
39 changed files with 807 additions and 181 deletions

View File

@@ -0,0 +1 @@
Add exit code log and possible memory shortage warning log for Restic command failure.

View File

@@ -0,0 +1 @@
Add support for OpenStack CSI drivers topology keys

View File

@@ -41,6 +41,10 @@ spec:
jsonPath: .metadata.creationTimestamp
name: Age
type: date
- description: Name of the node where the DataDownload is processed
jsonPath: .status.node
name: Node
type: string
name: v2alpha1
schema:
openAPIV3Schema:
@@ -132,6 +136,9 @@ spec:
message:
description: Message is a message about the DataDownload's status.
type: string
node:
description: Node is name of the node where the DataDownload is processed.
type: string
phase:
description: Phase is the current state of the DataDownload.
enum:

View File

@@ -42,6 +42,10 @@ spec:
jsonPath: .metadata.creationTimestamp
name: Age
type: date
- description: Name of the node where the DataUpload is processed
jsonPath: .status.node
name: Node
type: string
name: v2alpha1
schema:
openAPIV3Schema:
@@ -147,6 +151,9 @@ spec:
message:
description: Message is a message about the DataUpload's status.
type: string
node:
description: Node is name of the node where the DataUpload is processed.
type: string
path:
description: Path is the full path of the snapshot volume being backed
up.

File diff suppressed because one or more lines are too long

4
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/vmware-tanzu/velero
go 1.18
go 1.20
require (
cloud.google.com/go/storage v1.30.1
@@ -35,6 +35,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/vmware-tanzu/crash-diagnostics v0.3.7
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f
golang.org/x/mod v0.10.0
golang.org/x/net v0.9.0
golang.org/x/oauth2 v0.7.0
@@ -139,7 +140,6 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect

View File

@@ -111,6 +111,10 @@ type DataDownloadStatus struct {
// about the restore operation.
// +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// Node is name of the node where the DataDownload is processed.
// +optional
Node string `json:"node,omitempty"`
}
// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
@@ -125,6 +129,7 @@ type DataDownloadStatus struct {
// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where the backup data is stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this DataDownload was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the DataDownload is processed"
type DataDownload struct {
metav1.TypeMeta `json:",inline"`

View File

@@ -140,6 +140,10 @@ type DataUploadStatus struct {
// about the backup operation.
// +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// Node is name of the node where the DataUpload is processed.
// +optional
Node string `json:"node,omitempty"`
}
// TODO(2.0) After converting all resources to use the runttime-controller client,
@@ -155,6 +159,7 @@ type DataUploadStatus struct {
// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where this backup should be stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this DataUpload was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the DataUpload is processed"
type DataUpload struct {
metav1.TypeMeta `json:",inline"`

View File

@@ -445,6 +445,10 @@ const (
azureCsiZoneKey = "topology.disk.csi.azure.com/zone"
gkeCsiZoneKey = "topology.gke.io/zone"
gkeZoneSeparator = "__"
// OpenStack CSI drivers topology keys
cinderCsiZoneKey = "topology.manila.csi.openstack.org/zone"
manilaCsiZoneKey = "topology.cinder.csi.openstack.org/zone"
)
// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
@@ -506,7 +510,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
if !labelFound {
var k string
log.Infof("label %q is not present on PersistentVolume", zoneLabelDeprecated)
k, pvFailureDomainZone = zoneFromPVNodeAffinity(pv, awsEbsCsiZoneKey, azureCsiZoneKey, gkeCsiZoneKey, zoneLabel, zoneLabelDeprecated)
k, pvFailureDomainZone = zoneFromPVNodeAffinity(pv, awsEbsCsiZoneKey, azureCsiZoneKey, gkeCsiZoneKey, cinderCsiZoneKey, manilaCsiZoneKey, zoneLabel, zoneLabelDeprecated)
if pvFailureDomainZone != "" {
log.Infof("zone info from nodeAffinity requirements: %s, key: %s", pvFailureDomainZone, k)
} else {

View File

@@ -110,3 +110,9 @@ func (d *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBui
return d
}
// StartTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownloadBuilder {
d.object.Status.StartTimestamp = startTime
return d
}

View File

@@ -113,3 +113,9 @@ func (d *DataUploadBuilder) CSISnapshot(cSISnapshot *velerov2alpha1api.CSISnapsh
d.object.Spec.CSISnapshot = cSISnapshot
return d
}
// StartTimestamp sets the DataUpload's StartTimestamp.
func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBuilder {
d.object.Status.StartTimestamp = startTime
return d
}

View File

@@ -71,20 +71,23 @@ const (
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
defaultResourceTimeout = 10 * time.Minute
defaultResourceTimeout = 10 * time.Minute
defaultDataMoverPrepareTimeout = 30 * time.Minute
)
type nodeAgentServerConfig struct {
metricsAddress string
resourceTimeout time.Duration
metricsAddress string
resourceTimeout time.Duration
dataMoverPrepareTimeout time.Duration
}
func NewServerCommand(f client.Factory) *cobra.Command {
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
formatFlag := logging.NewFormatFlag()
config := nodeAgentServerConfig{
metricsAddress: defaultMetricsAddress,
resourceTimeout: defaultResourceTimeout,
metricsAddress: defaultMetricsAddress,
resourceTimeout: defaultResourceTimeout,
dataMoverPrepareTimeout: defaultDataMoverPrepareTimeout,
}
command := &cobra.Command{
@@ -110,6 +113,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.")
command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.")
return command
}
@@ -256,13 +260,13 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger)
s.markDataUploadsCancel(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger)
s.markDataDownloadsCancel(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")

View File

@@ -62,10 +62,11 @@ type DataDownloadReconciler struct {
nodeName string
repositoryEnsurer *repository.Ensurer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
}
func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, logger logrus.FieldLogger) *DataDownloadReconciler {
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
@@ -77,6 +78,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
}
}
@@ -154,7 +156,12 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, dd)
}
}
return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")
@@ -203,7 +210,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
@@ -364,8 +370,15 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
dd := object.(*velerov2alpha1api.DataDownload)
return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted)
})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov2alpha1api.DataDownload{}).
Watches(s, nil, builder.WithPredicates(gp)).
Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
@@ -417,9 +430,15 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
requests := make([]reconcile.Request, 1)
r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
err = r.patchDataDownload(context.Background(), dd, prepareDataDownload)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to patch data download")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload")
return []reconcile.Request{}
}
@@ -457,18 +476,9 @@ func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli clie
return dataDownloads, nil
}
func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error {
original := req.DeepCopy()
mutate(req)
if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil {
return errors.Wrap(err, "error patching data download")
}
return nil
}
func prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
}
func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
@@ -493,17 +503,62 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
}
func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) {
updated := dd.DeepCopy()
updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
r.logger.Infof("Accepting data download %s", dd.Name)
r.logger.Infof("Accepting snapshot restore %s", dd.Name)
// For all data download controller in each node-agent will try to update download CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
})
if err != nil {
return false, err
}
if succeeded {
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
}
r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
}
func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
log := r.logger.WithField("DataDownload", dd.Name)
log.Info("Timeout happened for preparing datadownload")
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
})
if err != nil {
log.WithError(err).Warn("Failed to update datadownload")
return
}
if !succeeded {
log.Warn("Dataupload has been updated by others")
return
}
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
log.Info("Dataupload has been cleaned up")
}
func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updated := dd.DeepCopy()
updateFunc(updated)
err := r.client.Update(ctx, updated)
if err == nil {
return true, nil
} else if apierrors.IsConflict(err) {
r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others")
return false, nil
} else {
return false, err

View File

@@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgofake "k8s.io/client-go/kubernetes/fake"
@@ -65,6 +66,29 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder {
}
func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) {
var errs []error = make([]error, 4)
if len(needError) == 4 {
if needError[0] {
errs[0] = fmt.Errorf("Get error")
}
if needError[1] {
errs[1] = fmt.Errorf("Create error")
}
if needError[2] {
errs[2] = fmt.Errorf("Update error")
}
if needError[3] {
errs[3] = fmt.Errorf("Patch error")
}
}
return initDataDownloadReconcilerWithError(objects, errs...)
}
func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...error) (*DataDownloadReconciler, error) {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
if err != nil {
@@ -112,7 +136,7 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D
if err != nil {
return nil, err
}
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", velerotest.NewLogger()), nil
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger()), nil
}
func TestDataDownloadReconcile(t *testing.T) {
@@ -132,6 +156,7 @@ func TestDataDownloadReconcile(t *testing.T) {
notMockCleanUp bool
mockCancel bool
mockClose bool
expected *velerov2alpha1api.DataDownload
expectedStatusMsg string
expectedResult *ctrl.Result
}{
@@ -215,7 +240,7 @@ func TestDataDownloadReconcile(t *testing.T) {
dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
needErrs: []bool{true, false, false, false},
expectedStatusMsg: "Create error",
expectedStatusMsg: "Get error",
},
{
name: "Unsupported dataDownload type",
@@ -246,6 +271,11 @@ func TestDataDownloadReconcile(t *testing.T) {
expectedStatusMsg: "Error to expose restore exposer",
isExposeErr: true,
},
{
name: "prepare timeout",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(),
},
}
for _, test := range tests {
@@ -345,6 +375,11 @@ func TestDataDownloadReconcile(t *testing.T) {
Namespace: test.dd.Namespace,
}, &dd)
if test.expected != nil {
require.NoError(t, err)
assert.Equal(t, dd.Status.Phase, test.expected.Status.Phase)
}
if test.isGetExposeErr {
assert.Contains(t, dd.Status.Message, test.expectedStatusMsg)
}
@@ -588,3 +623,93 @@ func TestFindDataDownloadForPod(t *testing.T) {
}
}
}
func TestAcceptDataDownload(t *testing.T) {
tests := []struct {
name string
dd *velerov2alpha1api.DataDownload
needErrs []error
succeeded bool
expectedErr string
}{
{
name: "update fail",
dd: dataDownloadBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
expectedErr: "fake-update-error",
},
{
name: "accepted by others",
dd: dataDownloadBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
},
{
name: "succeed",
dd: dataDownloadBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
succeeded: true,
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initDataDownloadReconcilerWithError(nil, test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)
succeeded, err := r.acceptDataDownload(ctx, test.dd)
assert.Equal(t, test.succeeded, succeeded)
if test.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectedErr)
}
}
}
func TestOnDdPrepareTimeout(t *testing.T) {
tests := []struct {
name string
dd *velerov2alpha1api.DataDownload
needErrs []error
expected *velerov2alpha1api.DataDownload
}{
{
name: "update fail",
dd: dataDownloadBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
expected: dataDownloadBuilder().Result(),
},
{
name: "update interrupted",
dd: dataDownloadBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
expected: dataDownloadBuilder().Result(),
},
{
name: "succeed",
dd: dataDownloadBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(),
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initDataDownloadReconcilerWithError(nil, test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)
r.onPrepareTimeout(ctx, test.dd)
dd := velerov2alpha1api.DataDownload{}
_ = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.dd.Name,
Namespace: test.dd.Namespace,
}, &dd)
assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase)
}
}

View File

@@ -55,6 +55,8 @@ import (
const dataMoverType string = "velero"
const dataUploadDownloadRequestor string = "snapshot-data-upload-download"
const preparingMonitorFrequency time.Duration = time.Minute
// DataUploadReconciler reconciles a DataUpload object
type DataUploadReconciler struct {
client client.Client
@@ -68,11 +70,12 @@ type DataUploadReconciler struct {
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
}
func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface,
csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, log logrus.FieldLogger) *DataUploadReconciler {
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
kubeClient: kubeClient,
@@ -85,6 +88,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
repoEnsurer: repoEnsurer,
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)},
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
}
}
@@ -152,7 +156,12 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
if du.Spec.Cancel {
r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName())
} else if du.Status.StartTimestamp != nil {
if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, &du)
}
}
return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
log.Info("Data upload is prepared")
@@ -199,7 +208,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Update status to InProgress
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
return r.errorOut(ctx, &du, err, "error updating dataupload status", log)
}
@@ -379,8 +387,15 @@ func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespa
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
du := object.(*velerov2alpha1api.DataUpload)
return (du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted)
})
return ctrl.NewControllerManagedBy(mgr).
For(&velerov2alpha1api.DataUpload{}).
Watches(s, nil, builder.WithPredicates(gp)).
Watches(&source.Kind{Type: &corev1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findDataUploadForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
@@ -429,8 +444,15 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
}
r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)
if err := r.patchDataUpload(context.Background(), du, prepareDataUpload); err != nil {
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("failed to patch dataupload")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Dataupload": du.Name,
"Backup pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload")
return []reconcile.Request{}
}
@@ -467,18 +489,9 @@ func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.C
return dataUploads, nil
}
func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error {
original := req.DeepCopy()
mutate(req)
if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil {
return errors.Wrap(err, "error patching DataUpload")
}
return nil
}
func prepareDataUpload(du *velerov2alpha1api.DataUpload) {
func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
du.Status.Node = r.nodeName
}
func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1api.DataUpload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
@@ -511,19 +524,73 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
}
func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) {
updated := du.DeepCopy()
updated.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
r.logger.Infof("Accepting snapshot backup %s", du.Name)
r.logger.Infof("Accepting data upload %s", du.Name)
// For all data upload controller in each node-agent will try to update dataupload CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
})
if err != nil {
return false, err
}
if succeeded {
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
return true, nil
}
r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
return false, nil
}
func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) {
log := r.logger.WithField("Dataupload", du.Name)
log.Info("Timeout happened for preparing dataupload")
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = "timeout on preparing data upload"
})
if err != nil {
log.WithError(err).Warn("Failed to update dataupload")
return
}
if !succeeded {
log.Warn("Dataupload has been updated by others")
return
}
ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
if !ok {
log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)).
Warn("Failed to clean up resources on canceled")
} else {
var volumeSnapshotName string
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition
volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot
}
ep.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace)
log.Info("Dataupload has been cleaned up")
}
}
func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload,
updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) {
updated := du.DeepCopy()
updateFunc(updated)
err := r.client.Update(ctx, updated)
if err == nil {
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod backup has been accepted by %s", r.nodeName)
return true, nil
} else if apierrors.IsConflict(err) {
r.logger.WithField("Dataupload", du.Name).Info("This datauplod backup has been accepted by others")
return false, nil
} else {
return false, err

View File

@@ -58,45 +58,68 @@ const fakeSnapshotType velerov2alpha1api.SnapshotType = "fake-snapshot"
type FakeClient struct {
kbclient.Client
getError bool
createError bool
updateError bool
patchError bool
getError error
createError error
updateError error
patchError error
}
func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error {
if c.getError {
return fmt.Errorf("Create error")
if c.getError != nil {
return c.getError
}
return c.Client.Get(ctx, key, obj)
}
func (c *FakeClient) Create(ctx context.Context, obj kbclient.Object, opts ...kbclient.CreateOption) error {
if c.createError {
return fmt.Errorf("Create error")
if c.createError != nil {
return c.createError
}
return c.Client.Create(ctx, obj, opts...)
}
func (c *FakeClient) Update(ctx context.Context, obj kbclient.Object, opts ...kbclient.UpdateOption) error {
if c.updateError {
return fmt.Errorf("Update error")
if c.updateError != nil {
return c.updateError
}
return c.Client.Update(ctx, obj, opts...)
}
func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbclient.Patch, opts ...kbclient.PatchOption) error {
if c.patchError {
return fmt.Errorf("Patch error")
if c.patchError != nil {
return c.patchError
}
return c.Client.Patch(ctx, obj, patch, opts...)
}
func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) {
var errs []error = make([]error, 4)
if len(needError) == 4 {
if needError[0] {
errs[0] = fmt.Errorf("Get error")
}
if needError[1] {
errs[1] = fmt.Errorf("Create error")
}
if needError[2] {
errs[2] = fmt.Errorf("Update error")
}
if needError[3] {
errs[3] = fmt.Errorf("Patch error")
}
}
return initDataUploaderReconcilerWithError(errs...)
}
func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconciler, error) {
vscName := "fake-vsc"
vsObject := &snapshotv1api.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
@@ -170,7 +193,7 @@ func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error
return nil, err
}
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil,
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, velerotest.NewLogger()), nil
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger()), nil
}
func dataUploadBuilder() *builder.DataUploadBuilder {
@@ -277,7 +300,7 @@ func TestReconcile(t *testing.T) {
expectedProcessed: false,
expected: nil,
expectedRequeue: ctrl.Result{},
expectedErrMsg: "getting DataUpload: Create error",
expectedErrMsg: "getting DataUpload: Get error",
needErrs: []bool{true, false, false, false},
}, {
name: "Unsupported data mover type",
@@ -339,6 +362,11 @@ func TestReconcile(t *testing.T) {
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(),
expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute},
},
{
name: "prepare timeout",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
},
}
for _, test := range tests {
@@ -607,3 +635,107 @@ func TestFindDataUploadForPod(t *testing.T) {
}
}
}
type fakeAPIStatus struct {
reason metav1.StatusReason
}
func (f *fakeAPIStatus) Status() metav1.Status {
return metav1.Status{
Reason: f.reason,
}
}
func (f *fakeAPIStatus) Error() string {
return string(f.reason)
}
func TestAcceptDataUpload(t *testing.T) {
tests := []struct {
name string
du *velerov2alpha1api.DataUpload
needErrs []error
succeeded bool
expectedErr string
}{
{
name: "update fail",
du: dataUploadBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
expectedErr: "fake-update-error",
},
{
name: "accepted by others",
du: dataUploadBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
},
{
name: "succeed",
du: dataUploadBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
succeeded: true,
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initDataUploaderReconcilerWithError(test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.du)
require.NoError(t, err)
succeeded, err := r.acceptDataUpload(ctx, test.du)
assert.Equal(t, test.succeeded, succeeded)
if test.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.expectedErr)
}
}
}
func TestOnDuPrepareTimeout(t *testing.T) {
tests := []struct {
name string
du *velerov2alpha1api.DataUpload
needErrs []error
expected *velerov2alpha1api.DataUpload
}{
{
name: "update fail",
du: dataUploadBuilder().Result(),
needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil},
expected: dataUploadBuilder().Result(),
},
{
name: "update interrupted",
du: dataUploadBuilder().Result(),
needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil},
expected: dataUploadBuilder().Result(),
},
{
name: "succeed",
du: dataUploadBuilder().Result(),
needErrs: []error{nil, nil, nil, nil},
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
},
}
for _, test := range tests {
ctx := context.Background()
r, err := initDataUploaderReconcilerWithError(test.needErrs...)
require.NoError(t, err)
err = r.client.Create(ctx, test.du)
require.NoError(t, err)
r.onPrepareTimeout(ctx, test.du)
du := velerov2alpha1api.DataUpload{}
_ = r.client.Get(ctx, kbclient.ObjectKey{
Name: test.du.Name,
Namespace: test.du.Namespace,
}, &du)
assert.Equal(t, test.expected.Status.Phase, du.Status.Phase)
}
}

View File

@@ -112,7 +112,7 @@ func (r *RepositoryService) exec(cmd *restic.Command, bsl *velerov1api.BackupSto
cmd.ExtraFlags = append(cmd.ExtraFlags, skipTLSRet)
}
stdout, stderr, err := veleroexec.RunCommand(cmd.Cmd())
stdout, stderr, err := veleroexec.RunCommandWithLog(cmd.Cmd(), r.log)
r.log.WithFields(logrus.Fields{
"repository": cmd.RepoName(),
"command": cmd.String(),

View File

@@ -86,6 +86,7 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.Prog
err := cmd.Start()
if err != nil {
exec.LogErrorAsExitCode(err, log)
return stdoutBuf.String(), stderrBuf.String(), err
}
@@ -119,6 +120,7 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.Prog
err = cmd.Wait()
if err != nil {
exec.LogErrorAsExitCode(err, log)
return stdoutBuf.String(), stderrBuf.String(), err
}
quit <- struct{}{}
@@ -229,7 +231,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.Pr
}
}()
stdout, stderr, err := exec.RunCommand(restoreCmd.Cmd())
stdout, stderr, err := exec.RunCommandWithLog(restoreCmd.Cmd(), log)
quit <- struct{}{}
// update progress to 100%

View File

@@ -22,6 +22,7 @@ import (
"os/exec"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// RunCommand runs a command and returns its stdout, stderr, and its returned
@@ -52,3 +53,25 @@ func RunCommand(cmd *exec.Cmd) (string, string, error) {
return stdout, stderr, runErr
}
func RunCommandWithLog(cmd *exec.Cmd, log logrus.FieldLogger) (string, string, error) {
stdout, stderr, err := RunCommand(cmd)
LogErrorAsExitCode(err, log)
return stdout, stderr, err
}
func LogErrorAsExitCode(err error, log logrus.FieldLogger) {
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
log.Errorf("Restic command fail with ExitCode: %d. Process ID is %d, Exit error is: %s", exitError.ExitCode(), exitError.Pid(), exitError.String())
// Golang's os.exec -1 ExitCode means signal kill. Usually this is caused
// by CGroup's OOM. Log a warning to notice user.
// https://github.com/golang/go/blob/master/src/os/exec_posix.go#L128-L136
if exitError.ExitCode() == -1 {
log.Warnf("The ExitCode is -1, which means the process is terminated by signal. Usually this is caused by CGroup kill due to out of memory. Please check whether there is such information in the work nodes' dmesg log.")
}
} else {
log.WithError(err).Info("Error cannot be convert to ExitError format.")
}
}
}

View File

@@ -74,7 +74,7 @@ Integrated Edition (formerly VMware Enterprise PKS), or Microsoft Azure.
**RancherOS**
Update the host path for volumes in the nonde-agent DaemonSet in the Velero namespace from `/var/lib/kubelet/pods` to
Update the host path for volumes in the node-agent DaemonSet in the Velero namespace from `/var/lib/kubelet/pods` to
`/opt/rke/var/lib/kubelet/pods`.
```yaml
@@ -539,7 +539,7 @@ that it's backing up for the volumes to be backed up using FSB.
5. Meanwhile, each `PodVolumeBackup` is handled by the controller on the appropriate node, which:
- has a hostPath volume mount of `/var/lib/kubelet/pods` to access the pod volume data
- finds the pod volume's subdirectory within the above volume
- based on the path selection, Velero inokes restic or kopia for backup
- based on the path selection, Velero invokes restic or kopia for backup
- updates the status of the custom resource to `Completed` or `Failed`
6. As each `PodVolumeBackup` finishes, the main Velero process adds it to the Velero backup in a file named
`<backup-name>-podvolumebackups.json.gz`. This file gets uploaded to object storage alongside the backup tarball.
@@ -556,7 +556,7 @@ It will be used for restores, as seen in the next section.
3. Velero adds an init container to the pod, whose job is to wait for all FSB restores for the pod to complete (more
on this shortly)
4. Velero creates the pod, with the added init container, by submitting it to the Kubernetes API. Then, the Kubernetes
scheduler schedules this pod to a worker node, and the pod must be in a running state. If the pod fails to start for
scheduler schedules this pod to a worker node. If the pod fails to be scheduled for
some reason (i.e. lack of cluster resources), the FSB restore will not be done.
5. Velero creates a `PodVolumeRestore` custom resource for each volume to be restored in the pod
6. The main Velero process now waits for each `PodVolumeRestore` resource to complete or fail
@@ -564,7 +564,7 @@ some reason (i.e. lack of cluster resources), the FSB restore will not be done.
- has a hostPath volume mount of `/var/lib/kubelet/pods` to access the pod volume data
- waits for the pod to be running the init container
- finds the pod volume's subdirectory within the above volume
- based on the path selection, Velero inokes restic or kopia for restore
- based on the path selection, Velero invokes restic or kopia for restore
- on success, writes a file into the pod volume, in a `.velero` subdirectory, whose name is the UID of the Velero
restore that this pod volume restore is for
- updates the status of the custom resource to `Completed` or `Failed`

View File

@@ -74,7 +74,7 @@ Integrated Edition (formerly VMware Enterprise PKS), or Microsoft Azure.
**RancherOS**
Update the host path for volumes in the nonde-agent DaemonSet in the Velero namespace from `/var/lib/kubelet/pods` to
Update the host path for volumes in the node-agent DaemonSet in the Velero namespace from `/var/lib/kubelet/pods` to
`/opt/rke/var/lib/kubelet/pods`.
```yaml
@@ -539,7 +539,7 @@ that it's backing up for the volumes to be backed up using FSB.
5. Meanwhile, each `PodVolumeBackup` is handled by the controller on the appropriate node, which:
- has a hostPath volume mount of `/var/lib/kubelet/pods` to access the pod volume data
- finds the pod volume's subdirectory within the above volume
- based on the path selection, Velero inokes restic or kopia for backup
- based on the path selection, Velero invokes restic or kopia for backup
- updates the status of the custom resource to `Completed` or `Failed`
6. As each `PodVolumeBackup` finishes, the main Velero process adds it to the Velero backup in a file named
`<backup-name>-podvolumebackups.json.gz`. This file gets uploaded to object storage alongside the backup tarball.
@@ -564,7 +564,7 @@ some reason (i.e. lack of cluster resources), the FSB restore will not be done.
- has a hostPath volume mount of `/var/lib/kubelet/pods` to access the pod volume data
- waits for the pod to be running the init container
- finds the pod volume's subdirectory within the above volume
- based on the path selection, Velero inokes restic or kopia for restore
- based on the path selection, Velero invokes restic or kopia for restore
- on success, writes a file into the pod volume, in a `.velero` subdirectory, whose name is the UID of the Velero
restore that this pod volume restore is for
- updates the status of the custom resource to `Completed` or `Failed`

View File

@@ -74,7 +74,7 @@ Integrated Edition (formerly VMware Enterprise PKS), or Microsoft Azure.
**RancherOS**
Update the host path for volumes in the nonde-agent DaemonSet in the Velero namespace from `/var/lib/kubelet/pods` to
Update the host path for volumes in the node-agent DaemonSet in the Velero namespace from `/var/lib/kubelet/pods` to
`/opt/rke/var/lib/kubelet/pods`.
```yaml
@@ -539,7 +539,7 @@ that it's backing up for the volumes to be backed up using FSB.
5. Meanwhile, each `PodVolumeBackup` is handled by the controller on the appropriate node, which:
- has a hostPath volume mount of `/var/lib/kubelet/pods` to access the pod volume data
- finds the pod volume's subdirectory within the above volume
- based on the path selection, Velero inokes restic or kopia for backup
- based on the path selection, Velero invokes restic or kopia for backup
- updates the status of the custom resource to `Completed` or `Failed`
6. As each `PodVolumeBackup` finishes, the main Velero process adds it to the Velero backup in a file named
`<backup-name>-podvolumebackups.json.gz`. This file gets uploaded to object storage alongside the backup tarball.
@@ -556,7 +556,7 @@ It will be used for restores, as seen in the next section.
3. Velero adds an init container to the pod, whose job is to wait for all FSB restores for the pod to complete (more
on this shortly)
4. Velero creates the pod, with the added init container, by submitting it to the Kubernetes API. Then, the Kubernetes
scheduler schedules this pod to a worker node, and the pod must be in a running state. If the pod fails to start for
scheduler schedules this pod to a worker node. If the pod fails to be scheduled for
some reason (i.e. lack of cluster resources), the FSB restore will not be done.
5. Velero creates a `PodVolumeRestore` custom resource for each volume to be restored in the pod
6. The main Velero process now waits for each `PodVolumeRestore` resource to complete or fail
@@ -564,7 +564,7 @@ some reason (i.e. lack of cluster resources), the FSB restore will not be done.
- has a hostPath volume mount of `/var/lib/kubelet/pods` to access the pod volume data
- waits for the pod to be running the init container
- finds the pod volume's subdirectory within the above volume
- based on the path selection, Velero inokes restic or kopia for restore
- based on the path selection, Velero invokes restic or kopia for restore
- on success, writes a file into the pod volume, in a `.velero` subdirectory, whose name is the UID of the Velero
restore that this pod volume restore is for
- updates the status of the custom resource to `Completed` or `Failed`

View File

@@ -73,6 +73,7 @@ BSL_PREFIX ?=
BSL_CONFIG ?=
VSL_CONFIG ?=
CLOUD_PROVIDER ?=
STANDBY_CLUSTER_CLOUD_PROVIDER ?=
OBJECT_STORE_PROVIDER ?=
INSTALL_VELERO ?= true
REGISTRY_CREDENTIAL_FILE ?=
@@ -99,6 +100,9 @@ STANDBY_CLUSTER ?=
UPLOADER_TYPE ?=
SNAPSHOT_MOVE_DATA ?= false
DATA_MOVER_PLUGIN ?=
.PHONY:ginkgo
ginkgo: # Make sure ginkgo is in $GOPATH/bin
@@ -143,7 +147,10 @@ run: ginkgo
-velero-server-debug-mode=$(VELERO_SERVER_DEBUG_MODE) \
-default-cluster=$(DEFAULT_CLUSTER) \
-standby-cluster=$(STANDBY_CLUSTER) \
-uploader-type=$(UPLOADER_TYPE)
-uploader-type=$(UPLOADER_TYPE) \
-snapshot-move-data=$(SNAPSHOT_MOVE_DATA) \
-data-mover-plugin=$(DATA_MOVER_plugin) \
-standby-cluster-cloud-provider=$(STANDBY_CLUSTER_CLOUD_PROVIDER)
build: ginkgo
mkdir -p $(OUTPUT_DIR)

View File

@@ -87,7 +87,7 @@ func BackupRestoreTest(useVolumeSnapshots bool) {
} else {
veleroCfg.DefaultVolumesToFsBackup = !useVolumeSnapshots
}
Expect(VeleroInstall(context.Background(), &veleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &veleroCfg, false)).To(Succeed())
}
backupName = "backup-" + UUIDgen.String()
restoreName = "restore-" + UUIDgen.String()
@@ -125,12 +125,9 @@ func BackupRestoreTest(useVolumeSnapshots bool) {
veleroCfg.DefaultVolumesToFsBackup = useVolumeSnapshots
}
Expect(VeleroInstall(context.Background(), &veleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &veleroCfg, false)).To(Succeed())
}
Expect(VeleroAddPluginsForProvider(context.TODO(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, veleroCfg.AdditionalBSLProvider,
veleroCfg.AddBSLPlugins, veleroCfg.Features)).To(Succeed())
Expect(VeleroAddPluginsForProvider(context.TODO(), veleroCfg.VeleroCLI, veleroCfg.VeleroNamespace, veleroCfg.AdditionalBSLProvider)).To(Succeed())
// Create Secret for additional BSL
secretName := fmt.Sprintf("bsl-credentials-%s", UUIDgen)

View File

@@ -59,7 +59,7 @@ func BackupsSyncTest() {
if VeleroCfg.InstallVelero {
veleroCfg := VeleroCfg
veleroCfg.UseVolumeSnapshots = false
Expect(VeleroInstall(context.Background(), &VeleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &VeleroCfg, false)).To(Succeed())
}
})
@@ -109,7 +109,7 @@ func BackupsSyncTest() {
By("Install velero", func() {
veleroCfg := VeleroCfg
veleroCfg.UseVolumeSnapshots = false
Expect(VeleroInstall(ctx, &VeleroCfg)).To(Succeed())
Expect(VeleroInstall(ctx, &VeleroCfg, false)).To(Succeed())
})
By("Check all backups in object storage are synced to Velero", func() {

View File

@@ -70,7 +70,7 @@ func TTLTest() {
// Make sure GCFrequency is shorter than backup TTL
veleroCfg.GCFrequency = "4m0s"
veleroCfg.UseVolumeSnapshots = useVolumeSnapshots
Expect(VeleroInstall(context.Background(), &veleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &veleroCfg, false)).To(Succeed())
}
})

View File

@@ -100,7 +100,7 @@ func APIExtensionsVersionsTest() {
Expect(KubectlConfigUseContext(context.Background(), veleroCfg.DefaultCluster)).To(Succeed())
veleroCfg.Features = "EnableAPIGroupVersions"
veleroCfg.UseVolumeSnapshots = false
Expect(VeleroInstall(context.Background(), &veleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &veleroCfg, false)).To(Succeed())
})
By(fmt.Sprintf("Install CRD of apiextenstions v1beta1 in cluster-A (%s)", veleroCfg.DefaultCluster), func() {
@@ -129,7 +129,7 @@ func APIExtensionsVersionsTest() {
By(fmt.Sprintf("Install Velero in cluster-B (%s) to restore workload", veleroCfg.StandbyCluster), func() {
Expect(KubectlConfigUseContext(context.Background(), veleroCfg.StandbyCluster)).To(Succeed())
veleroCfg.ClientToInstallVelero = veleroCfg.StandbyClient
Expect(VeleroInstall(context.Background(), &veleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &veleroCfg, false)).To(Succeed())
})
By(fmt.Sprintf("Waiting for backups sync to Velero in cluster-B (%s)", veleroCfg.StandbyCluster), func() {

View File

@@ -75,7 +75,7 @@ func APIGropuVersionsTest() {
if veleroCfg.InstallVelero {
veleroCfg.Features = "EnableAPIGroupVersions"
veleroCfg.UseVolumeSnapshots = false
err = VeleroInstall(context.Background(), &veleroCfg)
err = VeleroInstall(context.Background(), &veleroCfg, false)
Expect(err).NotTo(HaveOccurred())
}
testCaseNum = 4

View File

@@ -115,7 +115,7 @@ func (n *NamespaceMapping) Verify() error {
func (n *NamespaceMapping) Clean() error {
if !n.VeleroCfg.Debug {
if err := DeleteStorageClass(context.Background(), n.Client, "kibishii-storage-class"); err != nil {
if err := DeleteStorageClass(context.Background(), n.Client, KibishiiStorageClassName); err != nil {
return err
}
for _, ns := range n.MappedNamespaceList {

View File

@@ -104,9 +104,7 @@ func BslDeletionTest(useVolumeSnapshots bool) {
}
By(fmt.Sprintf("Add an additional plugin for provider %s", veleroCfg.AdditionalBSLProvider), func() {
Expect(VeleroAddPluginsForProvider(context.TODO(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, veleroCfg.AdditionalBSLProvider,
veleroCfg.AddBSLPlugins, veleroCfg.Features)).To(Succeed())
Expect(VeleroAddPluginsForProvider(context.TODO(), veleroCfg.VeleroCLI, veleroCfg.VeleroNamespace, veleroCfg.AdditionalBSLProvider)).To(Succeed())
})
additionalBsl := fmt.Sprintf("bsl-%s", UUIDgen)

View File

@@ -84,7 +84,9 @@ func init() {
flag.StringVar(&VeleroCfg.StandbyCluster, "standby-cluster", "", "Standby cluster context for migration test.")
flag.StringVar(&VeleroCfg.UploaderType, "uploader-type", "", "Identify persistent volume backup uploader.")
flag.BoolVar(&VeleroCfg.VeleroServerDebugMode, "velero-server-debug-mode", false, "Identify persistent volume backup uploader.")
flag.BoolVar(&VeleroCfg.SnapshotMoveData, "snapshot-move-data", false, "Install default plugin for data mover.")
flag.StringVar(&VeleroCfg.DataMoverPlugin, "data-mover-plugin", "", "Install customized plugin for data mover.")
flag.StringVar(&VeleroCfg.StandbyClusterCloudProvider, "standby-cluster-cloud-provider", "", "Install customized plugin for data mover.")
}
var _ = Describe("[APIGroup][APIVersion] Velero tests with various CRD API group versions", APIGropuVersionsTest)

View File

@@ -59,13 +59,14 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
BeforeEach(func() {
veleroCfg = VeleroCfg
UUIDgen, err = uuid.NewRandom()
migrationNamespace = "migration-workload-" + UUIDgen.String()
migrationNamespace = "migration-" + UUIDgen.String()
if useVolumeSnapshots && veleroCfg.CloudProvider == "kind" {
Skip("Volume snapshots not supported on kind")
}
if useVolumeSnapshots && veleroCfg.CloudProvider == "aws" {
if useVolumeSnapshots && veleroCfg.CloudProvider == "aws" && !veleroCfg.SnapshotMoveData {
Skip("Volume snapshots migration not supported on AWS provisioned by Sheperd public pool")
}
if veleroCfg.DefaultCluster == "" && veleroCfg.StandbyCluster == "" {
Skip("Migration test needs 2 clusters")
}
@@ -79,9 +80,10 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
})
AfterEach(func() {
if !veleroCfg.Debug {
By("Clean backups after test", func() {
DeleteBackups(context.Background(), *veleroCfg.DefaultClient)
})
// TODO: delete backup created by case self, not all
// By("Clean backups after test", func() {
// DeleteBackups(context.Background(), *veleroCfg.DefaultClient)
// })
if veleroCfg.InstallVelero {
By(fmt.Sprintf("Uninstall Velero and delete sample workload namespace %s", migrationNamespace), func() {
Expect(KubectlConfigUseContext(context.Background(), veleroCfg.DefaultCluster)).To(Succeed())
@@ -104,6 +106,16 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
})
When("kibishii is the sample workload", func() {
It("should be successfully backed up and restored to the default BackupStorageLocation", func() {
if veleroCfg.SnapshotMoveData {
if !useVolumeSnapshots {
Skip("FSB migration test is not needed in data mover scenario")
}
// TODO: remove this block once Velero version in cluster A is great than V1.11 for all migration path.
if veleroCLI2Version.VeleroVersion != "self" {
Skip(fmt.Sprintf("Only V1.12 support data mover scenario instead of %s", veleroCLI2Version.VeleroVersion))
}
}
oneHourTimeout, ctxCancel := context.WithTimeout(context.Background(), time.Minute*60)
defer ctxCancel()
flag.Parse()
@@ -132,25 +144,20 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
OriginVeleroCfg.ClientToInstallVelero = OriginVeleroCfg.DefaultClient
OriginVeleroCfg.UseVolumeSnapshots = useVolumeSnapshots
OriginVeleroCfg.UseNodeAgent = !useVolumeSnapshots
// TODO: self means 1.10 and upper version
if veleroCLI2Version.VeleroVersion != "self" {
// self represents v1.12
if veleroCLI2Version.VeleroVersion == "self" {
if OriginVeleroCfg.SnapshotMoveData {
OriginVeleroCfg.UseNodeAgent = true
}
} else {
Expect(err).To(Succeed())
fmt.Printf("Using default images address of Velero CLI %s\n", veleroCLI2Version.VeleroVersion)
OriginVeleroCfg.VeleroImage = ""
OriginVeleroCfg.RestoreHelperImage = ""
OriginVeleroCfg.Plugins = ""
//TODO: Remove this once origin Velero version is 1.10 and upper
OriginVeleroCfg.UploaderType = ""
if supportUploaderType {
OriginVeleroCfg.UseRestic = false
OriginVeleroCfg.UseNodeAgent = !useVolumeSnapshots
} else {
OriginVeleroCfg.UseRestic = !useVolumeSnapshots
OriginVeleroCfg.UseNodeAgent = false
}
}
Expect(VeleroInstall(context.Background(), &OriginVeleroCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &OriginVeleroCfg, false)).To(Succeed())
if veleroCLI2Version.VeleroVersion != "self" {
Expect(CheckVeleroVersion(context.Background(), OriginVeleroCfg.VeleroCLI,
OriginVeleroCfg.MigrateFromVeleroVersion)).To(Succeed())
@@ -167,10 +174,15 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
fmt.Sprintf("Failed to create namespace %s to install Kibishii workload", migrationNamespace))
})
KibishiiData := *DefaultKibishiiData
By("Deploy sample workload of Kibishii", func() {
if OriginVeleroCfg.SnapshotMoveData {
KibishiiData.ExpectedNodes = 6
}
Expect(KibishiiPrepareBeforeBackup(oneHourTimeout, *veleroCfg.DefaultClient, veleroCfg.CloudProvider,
migrationNamespace, veleroCfg.RegistryCredentialFile, veleroCfg.Features,
veleroCfg.KibishiiDirectory, useVolumeSnapshots, DefaultKibishiiData)).To(Succeed())
veleroCfg.KibishiiDirectory, useVolumeSnapshots, &KibishiiData)).To(Succeed())
})
By(fmt.Sprintf("Backup namespace %s", migrationNamespace), func() {
@@ -178,6 +190,7 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
BackupStorageClassCfg.BackupName = backupScName
BackupStorageClassCfg.IncludeResources = "StorageClass"
BackupStorageClassCfg.IncludeClusterResources = true
//TODO Remove UseRestic parameter once minor version is 1.10 or upper
BackupStorageClassCfg.UseResticIfFSBackup = !supportUploaderType
Expect(VeleroBackupNamespace(context.Background(), OriginVeleroCfg.VeleroCLI,
@@ -195,6 +208,7 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
BackupCfg.DefaultVolumesToFsBackup = !useVolumeSnapshots
//TODO Remove UseRestic parameter once minor version is 1.10 or upper
BackupCfg.UseResticIfFSBackup = !supportUploaderType
BackupCfg.SnapshotMoveData = OriginVeleroCfg.SnapshotMoveData
Expect(VeleroBackupNamespace(context.Background(), OriginVeleroCfg.VeleroCLI,
OriginVeleroCfg.VeleroNamespace, BackupCfg)).To(Succeed(), func() string {
@@ -211,21 +225,27 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
migrationNamespace, 2)).To(Succeed())
})
}
var snapshotCheckPoint SnapshotCheckPoint
snapshotCheckPoint.NamespaceBackedUp = migrationNamespace
By("Snapshot should be created in cloud object store", func() {
snapshotCheckPoint, err := GetSnapshotCheckPoint(*veleroCfg.DefaultClient, veleroCfg, 2,
migrationNamespace, backupName, KibishiiPVCNameList)
Expect(err).NotTo(HaveOccurred(), "Fail to get snapshot checkpoint")
Expect(SnapshotsShouldBeCreatedInCloud(veleroCfg.CloudProvider,
veleroCfg.CloudCredentialsFile, veleroCfg.BSLBucket,
veleroCfg.BSLConfig, backupName, snapshotCheckPoint)).To(Succeed())
})
if !OriginVeleroCfg.SnapshotMoveData {
By("Snapshot should be created in cloud object store", func() {
snapshotCheckPoint, err := GetSnapshotCheckPoint(*veleroCfg.DefaultClient, veleroCfg, 2,
migrationNamespace, backupName, KibishiiPVCNameList)
Expect(err).NotTo(HaveOccurred(), "Fail to get snapshot checkpoint")
Expect(SnapshotsShouldBeCreatedInCloud(veleroCfg.CloudProvider,
veleroCfg.CloudCredentialsFile, veleroCfg.BSLBucket,
veleroCfg.BSLConfig, backupName, snapshotCheckPoint)).To(Succeed())
})
} else {
//TODO: checkpoint for datamover
}
}
if useVolumeSnapshots && veleroCfg.CloudProvider == "azure" && strings.EqualFold(veleroCfg.Features, "EnableCSI") {
// Upgrade test is not running daily since no CSI plugin v1.0 released, because builds before
// v1.0 have issues to fail upgrade case.
if useVolumeSnapshots && veleroCfg.CloudProvider == "azure" &&
strings.EqualFold(veleroCfg.Features, "EnableCSI") &&
!OriginVeleroCfg.SnapshotMoveData {
By("Sleep 5 minutes to avoid snapshot recreated by unknown reason ", func() {
time.Sleep(5 * time.Minute)
})
@@ -233,7 +253,7 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
// the snapshots of AWS may be still in pending status when do the restore, wait for a while
// to avoid this https://github.com/vmware-tanzu/velero/issues/1799
// TODO remove this after https://github.com/vmware-tanzu/velero/issues/3533 is fixed
if veleroCfg.CloudProvider == "aws" && useVolumeSnapshots {
if veleroCfg.CloudProvider == "aws" && useVolumeSnapshots && !OriginVeleroCfg.SnapshotMoveData {
fmt.Println("Waiting 5 minutes to make sure the snapshots are ready...")
time.Sleep(5 * time.Minute)
}
@@ -253,7 +273,10 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
veleroCfg.ClientToInstallVelero = veleroCfg.StandbyClient
veleroCfg.UseNodeAgent = !useVolumeSnapshots
veleroCfg.UseRestic = false
Expect(VeleroInstall(context.Background(), &veleroCfg)).To(Succeed())
if veleroCfg.SnapshotMoveData {
veleroCfg.UseNodeAgent = true
}
Expect(VeleroInstall(context.Background(), &veleroCfg, true)).To(Succeed())
})
By(fmt.Sprintf("Waiting for backups sync to Velero in cluster-B (%s)", veleroCfg.StandbyCluster), func() {
@@ -262,12 +285,27 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
})
By(fmt.Sprintf("Restore %s", migrationNamespace), func() {
Expect(VeleroRestore(context.Background(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, restoreScName, backupScName, "StorageClass")).To(Succeed(), func() string {
RunDebug(context.Background(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, "", restoreName)
return "Fail to restore workload"
})
if OriginVeleroCfg.SnapshotMoveData {
By(fmt.Sprintf("Create a storage class %s for restore PV provisioned by storage class %s on different cloud provider", StorageClassName, KibishiiStorageClassName), func() {
Expect(InstallStorageClass(context.Background(), fmt.Sprintf("testdata/storage-class/%s.yaml", veleroCfg.StandbyClusterCloudProvider))).To(Succeed())
})
configmaptName := "datamover-storage-class-config"
labels := map[string]string{"velero.io/change-storage-class": "RestoreItemAction",
"velero.io/plugin-config": ""}
data := map[string]string{KibishiiStorageClassName: StorageClassName}
By(fmt.Sprintf("Create ConfigMap %s in namespace %s", configmaptName, veleroCfg.VeleroNamespace), func() {
_, err := CreateConfigMap(veleroCfg.StandbyClient.ClientGo, veleroCfg.VeleroNamespace, configmaptName, labels, data)
Expect(err).To(Succeed(), fmt.Sprintf("failed to create configmap in the namespace %q", veleroCfg.VeleroNamespace))
})
} else {
Expect(VeleroRestore(context.Background(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, restoreScName, backupScName, "StorageClass")).To(Succeed(), func() string {
RunDebug(context.Background(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, "", restoreName)
return "Fail to restore workload"
})
}
Expect(VeleroRestore(context.Background(), veleroCfg.VeleroCLI,
veleroCfg.VeleroNamespace, restoreName, backupName, "")).To(Succeed(), func() string {
RunDebug(context.Background(), veleroCfg.VeleroCLI,
@@ -278,7 +316,7 @@ func MigrationTest(useVolumeSnapshots bool, veleroCLI2Version VeleroCLI2Version)
By(fmt.Sprintf("Verify workload %s after restore ", migrationNamespace), func() {
Expect(KibishiiVerifyAfterRestore(*veleroCfg.StandbyClient, migrationNamespace,
oneHourTimeout, DefaultKibishiiData)).To(Succeed(), "Fail to verify workload after restore")
oneHourTimeout, &KibishiiData)).To(Succeed(), "Fail to verify workload after restore")
})
})
})

View File

@@ -72,6 +72,9 @@ type VeleroConfig struct {
DefaultVolumesToFsBackup bool
UseVolumeSnapshots bool
VeleroServerDebugMode bool
SnapshotMoveData bool
DataMoverPlugin string
StandbyClusterCloudProvider string
}
type SnapshotCheckPoint struct {
@@ -98,6 +101,7 @@ type BackupConfig struct {
OrderedResources string
UseResticIfFSBackup bool
DefaultVolumesToFsBackup bool
SnapshotMoveData bool
}
type VeleroCLI2Version struct {

View File

@@ -136,7 +136,7 @@ func BackupUpgradeRestoreTest(useVolumeSnapshots bool, veleroCLI2Version VeleroC
tmpCfgForOldVeleroInstall.UseNodeAgent = false
}
Expect(VeleroInstall(context.Background(), &tmpCfgForOldVeleroInstall)).To(Succeed())
Expect(VeleroInstall(context.Background(), &tmpCfgForOldVeleroInstall, false)).To(Succeed())
Expect(CheckVeleroVersion(context.Background(), tmpCfgForOldVeleroInstall.VeleroCLI,
tmpCfgForOldVeleroInstall.UpgradeFromVeleroVersion)).To(Succeed())
})
@@ -223,7 +223,7 @@ func BackupUpgradeRestoreTest(useVolumeSnapshots bool, veleroCLI2Version VeleroC
tmpCfg.UseNodeAgent = !useVolumeSnapshots
Expect(err).To(Succeed())
if supportUploaderType {
Expect(VeleroInstall(context.Background(), &tmpCfg)).To(Succeed())
Expect(VeleroInstall(context.Background(), &tmpCfg, false)).To(Succeed())
Expect(CheckVeleroVersion(context.Background(), tmpCfg.VeleroCLI,
tmpCfg.VeleroVersion)).To(Succeed())
} else {

View File

@@ -200,6 +200,7 @@ func AddLabelToCRD(ctx context.Context, crd, label string) error {
func KubectlApplyByFile(ctx context.Context, file string) error {
args := []string{"apply", "-f", file, "--force=true"}
fmt.Println(args)
return exec.CommandContext(ctx, "kubectl", args...).Run()
}

View File

@@ -0,0 +1,39 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package k8s
import (
"fmt"
"os/exec"
"github.com/pkg/errors"
"golang.org/x/net/context"
veleroexec "github.com/vmware-tanzu/velero/pkg/util/exec"
)
func ScaleStatefulSet(ctx context.Context, namespace, name string, replicas int) error {
cmd := exec.CommandContext(ctx, "kubectl", "scale", "statefulsets", name, fmt.Sprintf("--replicas=%d", replicas), "-n", namespace)
fmt.Printf("Scale kibishii stateful set in namespace %s with CMD: %s", name, cmd.Args)
_, stderr, err := veleroexec.RunCommand(cmd)
if err != nil {
return errors.Wrap(err, stderr)
}
return nil
}

View File

@@ -48,8 +48,11 @@ type KibishiiData struct {
ExpectedNodes int
}
var DefaultKibishiiData = &KibishiiData{2, 10, 10, 1024, 1024, 0, 2}
var DefaultKibishiiWorkerCounts = 2
var DefaultKibishiiData = &KibishiiData{2, 10, 10, 1024, 1024, 0, DefaultKibishiiWorkerCounts}
var KibishiiPVCNameList = []string{"kibishii-data-kibishii-deployment-0", "kibishii-data-kibishii-deployment-1"}
var KibishiiStorageClassName = "kibishii-storage-class"
// RunKibishiiTests runs kibishii tests on the provider.
func RunKibishiiTests(veleroCfg VeleroConfig, backupName, restoreName, backupLocation, kibishiiNamespace string,
@@ -196,11 +199,15 @@ func RunKibishiiTests(veleroCfg VeleroConfig, backupName, restoreName, backupLoc
}
func installKibishii(ctx context.Context, namespace string, cloudPlatform, veleroFeatures,
kibishiiDirectory string, useVolumeSnapshots bool) error {
kibishiiDirectory string, useVolumeSnapshots bool, workerReplicas int) error {
if strings.EqualFold(cloudPlatform, "azure") &&
strings.EqualFold(veleroFeatures, "EnableCSI") {
cloudPlatform = "azure-csi"
}
if strings.EqualFold(cloudPlatform, "aws") &&
strings.EqualFold(veleroFeatures, "EnableCSI") {
cloudPlatform = "aws-csi"
}
// We use kustomize to generate YAML for Kibishii from the checked-in yaml directories
kibishiiInstallCmd := exec.CommandContext(ctx, "kubectl", "apply", "-n", namespace, "-k",
kibishiiDirectory+cloudPlatform, "--timeout=90s")
@@ -216,6 +223,12 @@ func installKibishii(ctx context.Context, namespace string, cloudPlatform, veler
if err != nil {
return errors.Wrapf(err, "failed to label namespace with PSA policy, stderr=%s", stderr)
}
if workerReplicas != DefaultKibishiiWorkerCounts {
err = ScaleStatefulSet(ctx, namespace, "kibishii-deployment", workerReplicas)
if err != nil {
return errors.Wrapf(err, "failed to scale statefulset, stderr=%s", err.Error())
}
}
kibishiiSetWaitCmd := exec.CommandContext(ctx, "kubectl", "rollout", "status", "statefulset.apps/kibishii-deployment",
"-n", namespace, "-w", "--timeout=30m")
@@ -311,7 +324,7 @@ func KibishiiPrepareBeforeBackup(oneHourTimeout context.Context, client TestClie
}
if err := installKibishii(oneHourTimeout, kibishiiNamespace, providerName, veleroFeatures,
kibishiiDirectory, useVolumeSnapshots); err != nil {
kibishiiDirectory, useVolumeSnapshots, kibishiiData.ExpectedNodes); err != nil {
return errors.Wrap(err, "Failed to install Kibishii workload")
}
// wait for kibishii pod startup

View File

@@ -55,8 +55,23 @@ type installOptions struct {
VeleroServerDebugMode bool
}
func VeleroInstall(ctx context.Context, veleroCfg *VeleroConfig) error {
func VeleroInstall(ctx context.Context, veleroCfg *VeleroConfig, isStandbyCluster bool) error {
fmt.Printf("Velero install %s\n", time.Now().Format("2006-01-02 15:04:05"))
// veleroCfg struct including a set of BSL params and a set of additional BSL params,
// additional BSL set is for additional BSL test only, so only default BSL set is effective
// for VeleroInstall().
//
// veleroCfg struct including 2 sets of cluster setting, but VeleroInstall() only read
// default cluster settings, so if E2E test needs install on the standby cluster, default cluster
// setting should be reset to the value of standby cluster's.
//
// Some other setting might not needed by standby cluster installation like "snapshotMoveData", because in
// standby cluster only restore if performed, so CSI plugin is not needed, but it is installed due to
// the only one veleroCfg setting is provided as current design, since it will not introduce any issues as
// we can predict, so keep it intact for now.
if isStandbyCluster {
veleroCfg.CloudProvider = veleroCfg.StandbyClusterCloudProvider
}
if veleroCfg.CloudProvider != "kind" {
fmt.Printf("For cloud platforms, object store plugin provider will be set as cloud provider")
// If ObjectStoreProvider is not provided, then using the value same as CloudProvider
@@ -69,7 +84,7 @@ func VeleroInstall(ctx context.Context, veleroCfg *VeleroConfig) error {
}
}
providerPluginsTmp, err := getProviderPlugins(ctx, veleroCfg.VeleroCLI, veleroCfg.ObjectStoreProvider, veleroCfg.CloudProvider, veleroCfg.Plugins, veleroCfg.Features)
pluginsTmp, err := getPlugins(ctx, *veleroCfg)
if err != nil {
return errors.WithMessage(err, "Failed to get provider plugins")
}
@@ -91,22 +106,19 @@ func VeleroInstall(ctx context.Context, veleroCfg *VeleroConfig) error {
}
}
veleroInstallOptions, err := getProviderVeleroInstallOptions(veleroCfg, providerPluginsTmp)
veleroInstallOptions, err := getProviderVeleroInstallOptions(veleroCfg, pluginsTmp)
if err != nil {
return errors.WithMessagef(err, "Failed to get Velero InstallOptions for plugin provider %s", veleroCfg.ObjectStoreProvider)
}
veleroInstallOptions.UseVolumeSnapshots = veleroCfg.UseVolumeSnapshots
if !veleroCfg.UseRestic {
veleroInstallOptions.UseNodeAgent = veleroCfg.UseNodeAgent
}
veleroInstallOptions.UseRestic = veleroCfg.UseRestic
veleroInstallOptions.UseNodeAgent = veleroCfg.UseNodeAgent
veleroInstallOptions.Image = veleroCfg.VeleroImage
veleroInstallOptions.Namespace = veleroCfg.VeleroNamespace
veleroInstallOptions.UploaderType = veleroCfg.UploaderType
GCFrequency, _ := time.ParseDuration(veleroCfg.GCFrequency)
veleroInstallOptions.GarbageCollectionFrequency = GCFrequency
err = installVeleroServer(ctx, veleroCfg.VeleroCLI, &installOptions{
err = installVeleroServer(ctx, veleroCfg.VeleroCLI, veleroCfg.CloudProvider, &installOptions{
Options: veleroInstallOptions,
RegistryCredentialFile: veleroCfg.RegistryCredentialFile,
RestoreHelperImage: veleroCfg.RestoreHelperImage,
@@ -176,7 +188,7 @@ func clearupvSpherePluginConfig(c clientset.Interface, ns, secretName, configMap
return nil
}
func installVeleroServer(ctx context.Context, cli string, options *installOptions) error {
func installVeleroServer(ctx context.Context, cli, cloudProvider string, options *installOptions) error {
args := []string{"install"}
namespace := "velero"
if len(options.Namespace) > 0 {
@@ -192,9 +204,6 @@ func installVeleroServer(ctx context.Context, cli string, options *installOption
if options.DefaultVolumesToFsBackup {
args = append(args, "--default-volumes-to-fs-backup")
}
if options.UseRestic {
args = append(args, "--use-restic")
}
if options.UseVolumeSnapshots {
args = append(args, "--use-volume-snapshots")
}
@@ -219,10 +228,11 @@ func installVeleroServer(ctx context.Context, cli string, options *installOption
if len(options.Plugins) > 0 {
args = append(args, "--plugins", options.Plugins.String())
}
fmt.Println("Start to install Azure VolumeSnapshotClass ...")
if len(options.Features) > 0 {
args = append(args, "--features", options.Features)
if strings.EqualFold(options.Features, "EnableCSI") && options.UseVolumeSnapshots {
if strings.EqualFold(options.ProviderName, "Azure") {
if strings.EqualFold(cloudProvider, "azure") {
if err := KubectlApplyByFile(ctx, "util/csi/AzureVolumeSnapshotClass.yaml"); err != nil {
return err
}
@@ -528,7 +538,7 @@ func PrepareVelero(ctx context.Context, caseName string) error {
return nil
}
fmt.Printf("need to install velero for case %s \n", caseName)
return VeleroInstall(context.Background(), &VeleroCfg)
return VeleroInstall(context.Background(), &VeleroCfg, false)
}
func VeleroUninstall(ctx context.Context, cli, namespace string) error {

View File

@@ -34,6 +34,7 @@ import (
"time"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -70,15 +71,16 @@ var pluginsMatrix = map[string]map[string][]string{
"csi": {"velero/velero-plugin-for-csi:v0.5.0"},
},
"main": {
"aws": {"velero/velero-plugin-for-aws:main"},
"azure": {"velero/velero-plugin-for-microsoft-azure:main"},
"vsphere": {"vsphereveleroplugin/velero-plugin-for-vsphere:v1.5.1"},
"gcp": {"velero/velero-plugin-for-gcp:main"},
"csi": {"velero/velero-plugin-for-csi:main"},
"aws": {"velero/velero-plugin-for-aws:main"},
"azure": {"velero/velero-plugin-for-microsoft-azure:main"},
"vsphere": {"vsphereveleroplugin/velero-plugin-for-vsphere:v1.5.1"},
"gcp": {"velero/velero-plugin-for-gcp:main"},
"csi": {"velero/velero-plugin-for-csi:main"},
"datamover": {"velero/velero-plugin-for-aws:main"},
},
}
func GetProviderPluginsByVersion(version, providerName, feature string) ([]string, error) {
func getPluginsByVersion(version, cloudProvider, objectStoreProvider, feature string, needDataMoverPlugin bool) ([]string, error) {
var cloudMap map[string][]string
arr := strings.Split(version, ".")
if len(arr) >= 3 {
@@ -92,17 +94,47 @@ func GetProviderPluginsByVersion(version, providerName, feature string) ([]strin
}
var pluginsForFeature []string
plugins, ok := cloudMap[providerName]
if !ok {
return nil, errors.Errorf("fail to get plugins by version: %s and provider %s", version, providerName)
if cloudProvider == "kind" {
plugins, ok := cloudMap["aws"]
if !ok {
return nil, errors.Errorf("fail to get plugins by version: %s and provider %s", version, cloudProvider)
}
return plugins, nil
}
plugins, ok := cloudMap[cloudProvider]
if !ok {
return nil, errors.Errorf("fail to get plugins by version: %s and provider %s", version, cloudProvider)
}
if objectStoreProvider != cloudProvider {
pluginsForObjectStoreProvider, ok := cloudMap[objectStoreProvider]
if !ok {
return nil, errors.Errorf("fail to get plugins by version: %s and object store provider %s", version, objectStoreProvider)
}
plugins = append(plugins, pluginsForObjectStoreProvider...)
}
if strings.EqualFold(feature, "EnableCSI") {
pluginsForFeature, ok = cloudMap["csi"]
if !ok {
return nil, errors.Errorf("fail to get plugins by version: %s and provider %s", version, providerName)
return nil, errors.Errorf("fail to get CSI plugins by version: %s ", version)
}
plugins = append(plugins, pluginsForFeature...)
}
return append(plugins, pluginsForFeature...), nil
if needDataMoverPlugin {
pluginsForDatamover, ok := cloudMap["datamover"]
if !ok {
return nil, errors.Errorf("fail to get plugins by for datamover")
}
for _, p := range pluginsForDatamover {
if !slices.Contains(plugins, p) {
plugins = append(plugins, pluginsForDatamover...)
}
}
}
return plugins, nil
}
// getProviderVeleroInstallOptions returns Velero InstallOptions for the provider.
@@ -280,6 +312,10 @@ func VeleroBackupNamespace(ctx context.Context, veleroCLI, veleroNamespace strin
args = append(args, "--selector", backupCfg.Selector)
}
if backupCfg.SnapshotMoveData {
args = append(args, "--snapshot-move-data")
}
if backupCfg.UseVolumeSnapshots {
if backupCfg.ProvideSnapshotsVolumeParam {
args = append(args, "--snapshot-volumes")
@@ -516,36 +552,67 @@ func VeleroVersion(ctx context.Context, veleroCLI, veleroNamespace string) error
return nil
}
func getProviderPlugins(ctx context.Context, veleroCLI, objectStoreProvider, cloudProvider, providerPlugins, feature string) ([]string, error) {
// getProviderPlugins only provide plugin for specific cloud provider
func getProviderPlugins(ctx context.Context, veleroCLI string, cloudProvider string) ([]string, error) {
if cloudProvider == "" {
return []string{}, errors.New("CloudProvider should be provided")
}
version, err := getVeleroVersion(ctx, veleroCLI, true)
if err != nil {
return nil, errors.WithMessage(err, "failed to get velero version")
}
plugins, err := getPluginsByVersion(version, cloudProvider, cloudProvider, "", false)
if err != nil {
return nil, errors.WithMessagef(err, "Fail to get plugin by provider %s and version %s", cloudProvider, version)
}
return plugins, nil
}
// getPlugins will collect all kinds plugins for VeleroInstall, such as provider
// plugins(cloud provider/object store provider, if object store provider is not
// provided, it should be set to value as cloud provider's), feature plugins (CSI/Datamover)
func getPlugins(ctx context.Context, veleroCfg VeleroConfig) ([]string, error) {
veleroCLI := veleroCfg.VeleroCLI
cloudProvider := veleroCfg.CloudProvider
objectStoreProvider := veleroCfg.ObjectStoreProvider
providerPlugins := veleroCfg.Plugins
feature := veleroCfg.Features
needDataMoverPlugin := false
// Fetch the plugins for the provider before checking for the object store provider below.
var plugins []string
if len(providerPlugins) > 0 {
plugins = strings.Split(providerPlugins, ",")
} else {
if cloudProvider == "" {
return []string{}, errors.New("CloudProvider should be provided")
}
if objectStoreProvider == "" {
objectStoreProvider = cloudProvider
}
version, err := getVeleroVersion(ctx, veleroCLI, true)
if err != nil {
return nil, errors.WithMessage(err, "failed to get velero version")
}
plugins, err = GetProviderPluginsByVersion(version, objectStoreProvider, feature)
if veleroCfg.SnapshotMoveData && veleroCfg.DataMoverPlugin == "" {
needDataMoverPlugin = true
}
plugins, err = getPluginsByVersion(version, cloudProvider, objectStoreProvider, feature, needDataMoverPlugin)
if err != nil {
return nil, errors.WithMessagef(err, "Fail to get plugin by provider %s and version %s", objectStoreProvider, version)
}
if objectStoreProvider != "" && cloudProvider != "kind" && objectStoreProvider != cloudProvider {
pluginsTmp, err := GetProviderPluginsByVersion(version, cloudProvider, feature)
if err != nil {
return nil, errors.WithMessage(err, "failed to get velero version")
}
plugins = append(plugins, pluginsTmp...)
}
}
return plugins, nil
}
// VeleroAddPluginsForProvider determines which plugins need to be installed for a provider and
// installs them in the current Velero installation, skipping over those that are already installed.
func VeleroAddPluginsForProvider(ctx context.Context, veleroCLI string, veleroNamespace string, provider string, addPlugins, feature string) error {
plugins, err := getProviderPlugins(ctx, veleroCLI, provider, provider, addPlugins, feature)
fmt.Printf("addPlugins cmd =%v\n", addPlugins)
func VeleroAddPluginsForProvider(ctx context.Context, veleroCLI string, veleroNamespace string, provider string) error {
plugins, err := getProviderPlugins(ctx, veleroCLI, provider)
fmt.Printf("provider cmd = %v\n", provider)
fmt.Printf("plugins cmd = %v\n", plugins)
if err != nil {