Merge pull request #8046 from Lyndon-Li/data-mover-ms-backup-1

Data mover micro service backup
This commit is contained in:
lyndon-li
2024-07-30 16:24:22 +08:00
committed by GitHub
9 changed files with 1557 additions and 8 deletions

View File

@@ -0,0 +1 @@
Data mover micro service backup according to design #7576

View File

@@ -14,16 +14,39 @@ limitations under the License.
package datamover
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
ctrl "sigs.k8s.io/controller-runtime"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
ctlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
)
type dataMoverBackupConfig struct {
@@ -52,7 +75,10 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
logger.Infof("Starting Velero data-mover backup %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())
f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s := newdataMoverBackup(logger, config)
s, err := newdataMoverBackup(logger, f, config)
if err != nil {
exitWithMessage(logger, false, "Failed to create data mover backup, %v", err)
}
s.run()
},
@@ -73,20 +99,187 @@ func NewBackupCommand(f client.Factory) *cobra.Command {
return command
}
const (
// defaultCredentialsDirectory is the path on disk where credential
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"
)
type dataMoverBackup struct {
logger logrus.FieldLogger
config dataMoverBackupConfig
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
client ctlclient.Client
cache ctlcache.Cache
namespace string
nodeName string
config dataMoverBackupConfig
kubeClient kubernetes.Interface
dataPathMgr *datapath.Manager
}
func newdataMoverBackup(logger logrus.FieldLogger, config dataMoverBackupConfig) *dataMoverBackup {
s := &dataMoverBackup{
logger: logger,
config: config,
func newdataMoverBackup(logger logrus.FieldLogger, factory client.Factory, config dataMoverBackupConfig) (*dataMoverBackup, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
clientConfig, err := factory.ClientConfig()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client config")
}
return s
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
scheme := runtime.NewScheme()
if err := velerov1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v1 scheme")
}
if err := velerov2alpha1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme")
}
if err := v1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to add core v1 scheme")
}
nodeName := os.Getenv("NODE_NAME")
// use a field selector to filter to only pods scheduled on this node.
cacheOption := ctlcache.Options{
Scheme: scheme,
ByObject: map[ctlclient.Object]ctlcache.ByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&velerov2alpha1api.DataUpload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
},
}
cli, err := ctlclient.New(clientConfig, ctlclient.Options{
Scheme: scheme,
})
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client")
}
cache, err := ctlcache.New(clientConfig, cacheOption)
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
}
s := &dataMoverBackup{
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
client: cli,
cache: cache,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}
s.kubeClient, err = factory.KubeClient()
if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create kube client")
}
s.dataPathMgr = datapath.NewManager(1)
return s, nil
}
var funcExitWithMessage = exitWithMessage
var funcCreateDataPathService = (*dataMoverBackup).createDataPathService
func (s *dataMoverBackup) run() {
signals.CancelOnShutdown(s.cancelFunc, s.logger)
go func() {
if err := s.cache.Start(s.ctx); err != nil {
s.logger.WithError(err).Warn("error starting cache")
}
}()
// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
}
func (s *dataMoverBackup) runDataPath() {
s.logger.Infof("Starting micro service in node %s for du %s", s.nodeName, s.config.duName)
dpService, err := funcCreateDataPathService(s)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to create data path service for DataUpload %s: %v", s.config.duName, err)
return
}
s.logger.Infof("Starting data path service %s", s.config.duName)
err = dpService.Init()
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err)
return
}
s.logger.Infof("Running data path service %s", s.config.duName)
result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err)
return
}
s.logger.WithField("du", s.config.duName).Info("Data path service completed")
dpService.Shutdown()
s.logger.WithField("du", s.config.duName).Info("Data path service is shut down")
s.cancelFunc()
funcExitWithMessage(s.logger, true, result)
}
var funcNewCredentialFileStore = credentials.NewNamespacedFileStore
var funcNewCredentialSecretStore = credentials.NewNamespacedSecretStore
func (s *dataMoverBackup) createDataPathService() (dataPathService, error) {
credentialFileStore, err := funcNewCredentialFileStore(
s.client,
s.namespace,
defaultCredentialsDirectory,
filesystem.NewFileSystem(),
)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential file store")
}
credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace)
if err != nil {
return nil, errors.Wrapf(err, "error to create credential secret store")
}
credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return nil, errors.Wrap(err, "error to get controller-runtime informer from manager")
}
repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout)
return datamover.NewBackupMicroService(s.ctx, s.client, s.kubeClient, s.config.duName, s.namespace, s.nodeName, datapath.AccessPoint{
ByPath: s.config.volumePath,
VolMode: uploader.PersistentVolumeMode(s.config.volumeMode),
}, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil
}

View File

@@ -0,0 +1,216 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamover
import (
"context"
"errors"
"fmt"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
ctlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
func fakeCreateDataPathServiceWithErr(_ *dataMoverBackup) (dataPathService, error) {
return nil, errors.New("fake-create-data-path-error")
}
var frHelper *fakeRunHelper
func fakeCreateDataPathService(_ *dataMoverBackup) (dataPathService, error) {
return frHelper, nil
}
type fakeRunHelper struct {
initErr error
runCancelableDataPathErr error
runCancelableDataPathResult string
exitMessage string
succeed bool
}
func (fr *fakeRunHelper) Init() error {
return fr.initErr
}
func (fr *fakeRunHelper) RunCancelableDataPath(_ context.Context) (string, error) {
if fr.runCancelableDataPathErr != nil {
return "", fr.runCancelableDataPathErr
} else {
return fr.runCancelableDataPathResult, nil
}
}
func (fr *fakeRunHelper) Shutdown() {
}
func (fr *fakeRunHelper) ExitWithMessage(logger logrus.FieldLogger, succeed bool, message string, a ...any) {
fr.succeed = succeed
fr.exitMessage = fmt.Sprintf(message, a...)
}
func TestRunDataPath(t *testing.T) {
tests := []struct {
name string
duName string
createDataPathFail bool
initDataPathErr error
runCancelableDataPathErr error
runCancelableDataPathResult string
expectedMessage string
expectedSucceed bool
}{
{
name: "create data path failed",
duName: "fake-name",
createDataPathFail: true,
expectedMessage: "Failed to create data path service for DataUpload fake-name: fake-create-data-path-error",
},
{
name: "init data path failed",
duName: "fake-name",
initDataPathErr: errors.New("fake-init-data-path-error"),
expectedMessage: "Failed to init data path service for DataUpload fake-name: fake-init-data-path-error",
},
{
name: "run data path failed",
duName: "fake-name",
runCancelableDataPathErr: errors.New("fake-run-data-path-error"),
expectedMessage: "Failed to run data path service for DataUpload fake-name: fake-run-data-path-error",
},
{
name: "succeed",
duName: "fake-name",
runCancelableDataPathResult: "fake-run-data-path-result",
expectedMessage: "fake-run-data-path-result",
expectedSucceed: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
frHelper = &fakeRunHelper{
initErr: test.initDataPathErr,
runCancelableDataPathErr: test.runCancelableDataPathErr,
runCancelableDataPathResult: test.runCancelableDataPathResult,
}
if test.createDataPathFail {
funcCreateDataPathService = fakeCreateDataPathServiceWithErr
} else {
funcCreateDataPathService = fakeCreateDataPathService
}
funcExitWithMessage = frHelper.ExitWithMessage
s := &dataMoverBackup{
logger: velerotest.NewLogger(),
cancelFunc: func() {},
config: dataMoverBackupConfig{
duName: test.duName,
},
}
s.runDataPath()
assert.Equal(t, test.expectedMessage, frHelper.exitMessage)
assert.Equal(t, test.expectedSucceed, frHelper.succeed)
})
}
}
type fakeCreateDataPathServiceHelper struct {
fileStoreErr error
secretStoreErr error
}
func (fc *fakeCreateDataPathServiceHelper) NewNamespacedFileStore(_ ctlclient.Client, _ string, _ string, _ filesystem.Interface) (credentials.FileStore, error) {
return nil, fc.fileStoreErr
}
func (fc *fakeCreateDataPathServiceHelper) NewNamespacedSecretStore(_ ctlclient.Client, _ string) (credentials.SecretStore, error) {
return nil, fc.secretStoreErr
}
func TestCreateDataPathService(t *testing.T) {
tests := []struct {
name string
fileStoreErr error
secretStoreErr error
mockGetInformer bool
getInformerErr error
expectedError string
}{
{
name: "create credential file store error",
fileStoreErr: errors.New("fake-file-store-error"),
expectedError: "error to create credential file store: fake-file-store-error",
},
{
name: "create credential secret store",
secretStoreErr: errors.New("fake-secret-store-error"),
expectedError: "error to create credential secret store: fake-secret-store-error",
},
{
name: "get informer error",
mockGetInformer: true,
getInformerErr: errors.New("fake-get-informer-error"),
expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error",
},
{
name: "succeed",
mockGetInformer: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fcHelper := &fakeCreateDataPathServiceHelper{
fileStoreErr: test.fileStoreErr,
secretStoreErr: test.secretStoreErr,
}
funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore
funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore
cache := cacheMock.NewCache(t)
if test.mockGetInformer {
cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr)
}
funcExitWithMessage = frHelper.ExitWithMessage
s := &dataMoverBackup{
cache: cache,
}
_, err := s.createDataPathService()
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
} else {
assert.NoError(t, err)
}
})
}
}

View File

@@ -14,6 +14,7 @@ limitations under the License.
package datamover
import (
"context"
"fmt"
"os"
@@ -39,6 +40,12 @@ func NewCommand(f client.Factory) *cobra.Command {
return command
}
type dataPathService interface {
Init() error
RunCancelableDataPath(context.Context) (string, error)
Shutdown()
}
var funcExit = os.Exit
var funcCreateFile = os.Create

View File

@@ -0,0 +1,231 @@
// Code generated by mockery v2.39.1. DO NOT EDIT.
package mocks
import (
cache "sigs.k8s.io/controller-runtime/pkg/cache"
client "sigs.k8s.io/controller-runtime/pkg/client"
context "context"
mock "github.com/stretchr/testify/mock"
schema "k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
)
// Cache is an autogenerated mock type for the Cache type
type Cache struct {
mock.Mock
}
// Get provides a mock function with given fields: ctx, key, obj, opts
func (_m *Cache) Get(ctx context.Context, key types.NamespacedName, obj client.Object, opts ...client.GetOption) error {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, key, obj)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for Get")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, types.NamespacedName, client.Object, ...client.GetOption) error); ok {
r0 = rf(ctx, key, obj, opts...)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetInformer provides a mock function with given fields: ctx, obj, opts
func (_m *Cache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, obj)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetInformer")
}
var r0 cache.Informer
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, client.Object, ...cache.InformerGetOption) (cache.Informer, error)); ok {
return rf(ctx, obj, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, client.Object, ...cache.InformerGetOption) cache.Informer); ok {
r0 = rf(ctx, obj, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cache.Informer)
}
}
if rf, ok := ret.Get(1).(func(context.Context, client.Object, ...cache.InformerGetOption) error); ok {
r1 = rf(ctx, obj, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetInformerForKind provides a mock function with given fields: ctx, gvk, opts
func (_m *Cache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, gvk)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetInformerForKind")
}
var r0 cache.Informer
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, schema.GroupVersionKind, ...cache.InformerGetOption) (cache.Informer, error)); ok {
return rf(ctx, gvk, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, schema.GroupVersionKind, ...cache.InformerGetOption) cache.Informer); ok {
r0 = rf(ctx, gvk, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cache.Informer)
}
}
if rf, ok := ret.Get(1).(func(context.Context, schema.GroupVersionKind, ...cache.InformerGetOption) error); ok {
r1 = rf(ctx, gvk, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// IndexField provides a mock function with given fields: ctx, obj, field, extractValue
func (_m *Cache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
ret := _m.Called(ctx, obj, field, extractValue)
if len(ret) == 0 {
panic("no return value specified for IndexField")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, client.Object, string, client.IndexerFunc) error); ok {
r0 = rf(ctx, obj, field, extractValue)
} else {
r0 = ret.Error(0)
}
return r0
}
// List provides a mock function with given fields: ctx, list, opts
func (_m *Cache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, list)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for List")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, client.ObjectList, ...client.ListOption) error); ok {
r0 = rf(ctx, list, opts...)
} else {
r0 = ret.Error(0)
}
return r0
}
// RemoveInformer provides a mock function with given fields: ctx, obj
func (_m *Cache) RemoveInformer(ctx context.Context, obj client.Object) error {
ret := _m.Called(ctx, obj)
if len(ret) == 0 {
panic("no return value specified for RemoveInformer")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, client.Object) error); ok {
r0 = rf(ctx, obj)
} else {
r0 = ret.Error(0)
}
return r0
}
// Start provides a mock function with given fields: ctx
func (_m *Cache) Start(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Start")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// WaitForCacheSync provides a mock function with given fields: ctx
func (_m *Cache) WaitForCacheSync(ctx context.Context) bool {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for WaitForCacheSync")
}
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context) bool); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// NewCache creates a new instance of Cache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewCache(t interface {
mock.TestingT
Cleanup(func())
}) *Cache {
mock := &Cache{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,324 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamover
import (
"context"
"encoding/json"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
cachetool "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
dataUploadDownloadRequestor = "snapshot-data-upload-download"
)
// BackupMicroService process data mover backups inside the backup pod
type BackupMicroService struct {
ctx context.Context
client client.Client
kubeClient kubernetes.Interface
repoEnsurer *repository.Ensurer
credentialGetter *credentials.CredentialGetter
logger logrus.FieldLogger
dataPathMgr *datapath.Manager
eventRecorder kube.EventRecorder
namespace string
dataUploadName string
dataUpload *velerov2alpha1api.DataUpload
sourceTargetPath datapath.AccessPoint
resultSignal chan dataPathResult
duInformer cache.Informer
duHandler cachetool.ResourceEventHandlerRegistration
nodeName string
}
type dataPathResult struct {
err error
result string
}
func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, dataUploadName string, namespace string, nodeName string,
sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter,
duInformer cache.Informer, log logrus.FieldLogger) *BackupMicroService {
return &BackupMicroService{
ctx: ctx,
client: client,
kubeClient: kubeClient,
credentialGetter: cred,
logger: log,
repoEnsurer: repoEnsurer,
dataPathMgr: dataPathMgr,
namespace: namespace,
dataUploadName: dataUploadName,
sourceTargetPath: sourceTargetPath,
nodeName: nodeName,
resultSignal: make(chan dataPathResult),
duInformer: duInformer,
}
}
func (r *BackupMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName)
handler, err := r.duInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldDu := oldObj.(*velerov2alpha1api.DataUpload)
newDu := newObj.(*velerov2alpha1api.DataUpload)
if newDu.Name != r.dataUpload.Name {
return
}
if newDu.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress {
return
}
if newDu.Spec.Cancel && !oldDu.Spec.Cancel {
r.cancelDataUpload(newDu)
}
},
},
)
if err != nil {
return errors.Wrap(err, "error adding du handler")
}
r.duHandler = handler
return err
}
var waitDuTimeout time.Duration = time.Minute * 2
func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) {
log := r.logger.WithFields(logrus.Fields{
"dataupload": r.dataUploadName,
})
du := &velerov2alpha1api.DataUpload{}
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitDuTimeout, true, func(ctx context.Context) (bool, error) {
err := r.client.Get(ctx, types.NamespacedName{
Namespace: r.namespace,
Name: r.dataUploadName,
}, du)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return true, errors.Wrapf(err, "error to get du %s", r.dataUploadName)
}
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
return true, nil
} else {
return false, nil
}
})
if err != nil {
log.WithError(err).Error("Failed to wait du")
return "", errors.Wrap(err, "error waiting for du")
}
r.dataUpload = du
log.Info("Run cancelable dataUpload")
callbacks := datapath.Callbacks{
OnCompleted: r.OnDataUploadCompleted,
OnFailed: r.OnDataUploadFailed,
OnCancelled: r.OnDataUploadCancelled,
OnProgress: r.OnDataUploadProgress,
}
fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log)
if err != nil {
return "", errors.Wrap(err, "error to create data path")
}
log.Debug("Async fs br created")
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: du.Spec.BackupStorageLocation,
SourceNamespace: du.Spec.SourceNamespace,
UploaderType: GetUploaderType(du.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return "", errors.Wrap(err, "error to initialize data path")
}
log.Info("Async fs br init")
tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}
if err := fsBackup.StartBackup(r.sourceTargetPath, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
RealSource: GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return "", errors.Wrap(err, "error starting data path backup")
}
log.Info("Async fs backup data path started")
r.eventRecorder.Event(du, false, datapath.EventReasonStarted, "Data path for %s started", du.Name)
result := ""
select {
case <-ctx.Done():
err = errors.New("timed out waiting for fs backup to complete")
break
case res := <-r.resultSignal:
err = res.err
result = res.result
break
}
if err != nil {
log.WithError(err).Error("Async fs backup was not completed")
}
return result, err
}
func (r *BackupMicroService) Shutdown() {
r.eventRecorder.Shutdown()
r.closeDataPath(r.ctx, r.dataUploadName)
if r.duHandler != nil {
if err := r.duInformer.RemoveEventHandler(r.duHandler); err != nil {
r.logger.WithError(err).Warn("Failed to remove pod handler")
}
}
}
var funcMarshal = json.Marshal
func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) {
defer r.closeDataPath(ctx, duName)
log := r.logger.WithField("dataupload", duName)
backupBytes, err := funcMarshal(result.Backup)
if err != nil {
log.WithError(err).Errorf("Failed to marshal backup result %v", result.Backup)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, "Failed to marshal backup result %v", result.Backup),
}
} else {
r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonCompleted, string(backupBytes))
r.resultSignal <- dataPathResult{
result: string(backupBytes),
}
}
log.Info("Async fs backup completed")
}
func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
defer r.closeDataPath(ctx, duName)
log := r.logger.WithField("dataupload", duName)
log.WithError(err).Error("Async fs backup data path failed")
r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonFailed, "Data path for data upload %s failed, error %v", r.dataUploadName, err)
r.resultSignal <- dataPathResult{
err: errors.Wrapf(err, "Data path for data upload %s failed", r.dataUploadName),
}
}
func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) {
defer r.closeDataPath(ctx, duName)
log := r.logger.WithField("dataupload", duName)
log.Warn("Async fs backup data path canceled")
r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonCancelled, "Data path for data upload %s canceled", duName)
r.resultSignal <- dataPathResult{
err: errors.New(datapath.ErrCancelled),
}
}
func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) {
log := r.logger.WithFields(logrus.Fields{
"dataupload": duName,
})
progressBytes, err := funcMarshal(progress)
if err != nil {
log.WithError(err).Errorf("Failed to marshal progress %v", progress)
return
}
log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes))
r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes))
}
func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(duName)
if fsBackup != nil {
fsBackup.Close(ctx)
}
r.dataPathMgr.RemoveAsyncBR(duName)
}
func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) {
r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled")
r.eventRecorder.Event(du, false, "Canceling", "Canceing for data upload %s", du.Name)
fsBackup := r.dataPathMgr.GetAsyncBR(du.Name)
if fsBackup == nil {
r.OnDataUploadCancelled(r.ctx, du.GetNamespace(), du.GetName())
} else {
fsBackup.Cancel()
}
}

View File

@@ -0,0 +1,441 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datamover
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/uploader"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
)
type backupMsTestHelper struct {
eventReason string
eventMsg string
marshalErr error
marshalBytes []byte
withEvent bool
eventLock sync.Mutex
}
func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) {
bt.eventLock.Lock()
defer bt.eventLock.Unlock()
bt.withEvent = true
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}
func (bt *backupMsTestHelper) Shutdown() {}
func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) {
if bt.marshalErr != nil {
return nil, bt.marshalErr
}
return bt.marshalBytes, nil
}
func (bt *backupMsTestHelper) EventReason() string {
bt.eventLock.Lock()
defer bt.eventLock.Unlock()
return bt.eventReason
}
func (bt *backupMsTestHelper) EventMessage() string {
bt.eventLock.Lock()
defer bt.eventLock.Unlock()
return bt.eventMsg
}
func TestOnDataUploadFailed(t *testing.T) {
dataUploadName := "fake-data-upload"
bt := &backupMsTestHelper{}
bs := &BackupMicroService{
dataUploadName: dataUploadName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := "Data path for data upload fake-data-upload failed: fake-error"
expectedEventReason := datapath.EventReasonFailed
expectedEventMsg := "Data path for data upload fake-data-upload failed, error fake-error"
go bs.OnDataUploadFailed(context.TODO(), velerov1api.DefaultNamespace, dataUploadName, errors.New("fake-error"))
result := <-bs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, bt.EventReason())
assert.Equal(t, expectedEventMsg, bt.EventMessage())
}
func TestOnDataUploadCancelled(t *testing.T) {
dataUploadName := "fake-data-upload"
bt := &backupMsTestHelper{}
bs := &BackupMicroService{
dataUploadName: dataUploadName,
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
expectedErr := datapath.ErrCancelled
expectedEventReason := datapath.EventReasonCancelled
expectedEventMsg := "Data path for data upload fake-data-upload canceled"
go bs.OnDataUploadCancelled(context.TODO(), velerov1api.DefaultNamespace, dataUploadName)
result := <-bs.resultSignal
assert.EqualError(t, result.err, expectedErr)
assert.Equal(t, expectedEventReason, bt.EventReason())
assert.Equal(t, expectedEventMsg, bt.EventMessage())
}
func TestOnDataUploadCompleted(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal backup result { false { }}: fake-marshal-error",
},
{
name: "succeed",
marshallStr: "fake-complete-string",
expectedEventReason: datapath.EventReasonCompleted,
expectedEventMsg: "fake-complete-string",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dataUploadName := "fake-data-upload"
bt := &backupMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
}
bs := &BackupMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
funcMarshal = bt.Marshal
go bs.OnDataUploadCompleted(context.TODO(), velerov1api.DefaultNamespace, dataUploadName, datapath.Result{})
result := <-bs.resultSignal
if test.marshalErr != nil {
assert.EqualError(t, result.err, test.expectedErr)
} else {
assert.NoError(t, result.err)
assert.Equal(t, test.expectedEventReason, bt.EventReason())
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
}
})
}
}
func TestOnDataUploadProgress(t *testing.T) {
tests := []struct {
name string
expectedErr string
expectedEventReason string
expectedEventMsg string
marshalErr error
marshallStr string
}{
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal backup result",
},
{
name: "succeed",
marshallStr: "fake-progress-string",
expectedEventReason: datapath.EventReasonProgress,
expectedEventMsg: "fake-progress-string",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dataUploadName := "fake-data-upload"
bt := &backupMsTestHelper{
marshalErr: test.marshalErr,
marshalBytes: []byte(test.marshallStr),
}
bs := &BackupMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
logger: velerotest.NewLogger(),
}
funcMarshal = bt.Marshal
bs.OnDataUploadProgress(context.TODO(), velerov1api.DefaultNamespace, dataUploadName, &uploader.Progress{})
if test.marshalErr != nil {
assert.False(t, bt.withEvent)
} else {
assert.True(t, bt.withEvent)
assert.Equal(t, test.expectedEventReason, bt.EventReason())
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
}
})
}
}
func TestCancelDataUpload(t *testing.T) {
tests := []struct {
name string
expectedEventReason string
expectedEventMsg string
expectedErr string
}{
{
name: "no fs backup",
expectedEventReason: datapath.EventReasonCancelled,
expectedEventMsg: "Data path for data upload fake-data-upload canceled",
expectedErr: datapath.ErrCancelled,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dataUploadName := "fake-data-upload"
du := builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).Result()
bt := &backupMsTestHelper{}
bs := &BackupMicroService{
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
go bs.cancelDataUpload(du)
result := <-bs.resultSignal
assert.EqualError(t, result.err, test.expectedErr)
assert.True(t, bt.withEvent)
assert.Equal(t, test.expectedEventReason, bt.EventReason())
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
})
}
}
func TestRunCancelableDataPath(t *testing.T) {
dataUploadName := "fake-data-upload"
du := builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).Phase(velerov2alpha1api.DataUploadPhaseNew).Result()
duInProgress := builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result()
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
tests := []struct {
name string
ctx context.Context
result *dataPathResult
dataPathMgr *datapath.Manager
kubeClientObj []runtime.Object
initErr error
startErr error
dataPathStarted bool
expectedEventMsg string
expectedErr string
}{
{
name: "no du",
ctx: context.Background(),
expectedErr: "error waiting for du: context deadline exceeded",
},
{
name: "du not in in-progress",
ctx: context.Background(),
kubeClientObj: []runtime.Object{du},
expectedErr: "error waiting for du: context deadline exceeded",
},
{
name: "create data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{duInProgress},
dataPathMgr: datapath.NewManager(0),
expectedErr: "error to create data path: Concurrent number exceeds",
},
{
name: "init data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{duInProgress},
initErr: errors.New("fake-init-error"),
expectedErr: "error to initialize data path: fake-init-error",
},
{
name: "start data path fail",
ctx: context.Background(),
kubeClientObj: []runtime.Object{duInProgress},
startErr: errors.New("fake-start-error"),
expectedErr: "error starting data path backup: fake-start-error",
},
{
name: "data path timeout",
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{duInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedErr: "timed out waiting for fs backup to complete",
},
{
name: "data path returns error",
ctx: context.Background(),
kubeClientObj: []runtime.Object{duInProgress},
dataPathStarted: true,
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedErr: "fake-data-path-error",
},
{
name: "succeed",
ctx: context.Background(),
kubeClientObj: []runtime.Object{duInProgress},
dataPathStarted: true,
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
},
}
scheme := runtime.NewScheme()
velerov2alpha1api.AddToScheme(scheme)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(scheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
bt := &backupMsTestHelper{}
bs := &BackupMicroService{
namespace: velerov1api.DefaultNamespace,
dataUploadName: dataUploadName,
ctx: context.Background(),
client: fakeClient,
dataPathMgr: datapath.NewManager(1),
eventRecorder: bt,
resultSignal: make(chan dataPathResult),
logger: velerotest.NewLogger(),
}
if test.ctx != nil {
bs.ctx = test.ctx
}
if test.dataPathMgr != nil {
bs.dataPathMgr = test.dataPathMgr
}
datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
fsBR := datapathmockes.NewAsyncBR(t)
if test.initErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr)
}
if test.startErr != nil {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr)
}
if test.dataPathStarted {
fsBR.On("Init", mock.Anything, mock.Anything).Return(nil)
fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(nil)
}
return fsBR
}
waitDuTimeout = time.Second
if test.result != nil {
go func() {
time.Sleep(time.Millisecond * 500)
bs.resultSignal <- *test.result
}()
}
result, err := bs.RunCancelableDataPath(test.ctx)
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
assert.Equal(t, test.result.result, result)
}
if test.expectedEventMsg != "" {
assert.True(t, bt.withEvent)
assert.Equal(t, test.expectedEventMsg, bt.EventMessage())
}
})
}
cancel()
}

74
pkg/util/kube/event.go Normal file
View File

@@ -0,0 +1,74 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)
type EventRecorder interface {
Event(object runtime.Object, warning bool, reason string, message string, a ...any)
Shutdown()
}
type eventRecorder struct {
broadcaster record.EventBroadcaster
recorder record.EventRecorder
}
func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string) EventRecorder {
res := eventRecorder{}
res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
MaxEvents: 1,
MessageFunc: func(event *v1.Event) string {
return event.Message
},
})
res.broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
res.recorder = res.broadcaster.NewRecorder(scheme, v1.EventSource{
Component: eventSource,
Host: eventNode,
})
return &res
}
func (er *eventRecorder) Event(object runtime.Object, warning bool, reason string, message string, a ...any) {
eventType := v1.EventTypeNormal
if warning {
eventType = v1.EventTypeWarning
}
if len(a) > 0 {
er.recorder.Eventf(object, eventType, reason, message, a...)
} else {
er.recorder.Event(object, eventType, reason, message)
}
}
func (er *eventRecorder) Shutdown() {
// StartEventWatcher doesn't wait for writing all buffered events to API server when Shutdown is called, so have to hardcode a sleep time
time.Sleep(2 * time.Second)
er.broadcaster.Shutdown()
}

View File

@@ -0,0 +1,62 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
corev1 "k8s.io/api/core/v1"
)
func TestEvent(t *testing.T) {
client := fake.NewSimpleClientset()
scheme := runtime.NewScheme()
err := corev1.AddToScheme(scheme)
require.NoError(t, err)
recorder := NewEventRecorder(client, scheme, "source-1", "fake-node")
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "fake-ns",
Name: "fake-pod",
UID: types.UID("fake-uid"),
},
Spec: corev1.PodSpec{
NodeName: "fake-node",
},
}
recorder.Event(pod, false, "Progress", "progress-1")
recorder.Event(pod, false, "Progress", "progress-2")
recorder.Shutdown()
items, err := client.CoreV1().Events("fake-ns").List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
assert.Len(t, items.Items, 1)
}