From 8742f1b1f33e1446a20132be5cb1bf666168581a Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 24 Jul 2024 15:41:26 +0800 Subject: [PATCH 1/2] data mover micro service backup Signed-off-by: Lyndon-Li --- changelogs/unreleased/8046-Lyndon-Li | 1 + pkg/cmd/cli/datamover/backup.go | 210 +++++++++- pkg/cmd/cli/datamover/backup_test.go | 216 ++++++++++ pkg/cmd/cli/datamover/data_mover.go | 7 + pkg/cmd/cli/datamover/mocks/Cache.go | 231 +++++++++++ pkg/datamover/backup_micro_service.go | 324 +++++++++++++++ pkg/datamover/backup_micro_service_test.go | 441 +++++++++++++++++++++ pkg/datapath/micro_service_watcher.go | 24 ++ pkg/util/kube/event.go | 74 ++++ pkg/util/kube/event_test.go | 62 +++ 10 files changed, 1581 insertions(+), 9 deletions(-) create mode 100644 changelogs/unreleased/8046-Lyndon-Li create mode 100644 pkg/cmd/cli/datamover/backup_test.go create mode 100644 pkg/cmd/cli/datamover/mocks/Cache.go create mode 100644 pkg/datamover/backup_micro_service.go create mode 100644 pkg/datamover/backup_micro_service_test.go create mode 100644 pkg/datapath/micro_service_watcher.go create mode 100644 pkg/util/kube/event.go create mode 100644 pkg/util/kube/event_test.go diff --git a/changelogs/unreleased/8046-Lyndon-Li b/changelogs/unreleased/8046-Lyndon-Li new file mode 100644 index 000000000..a3592d7e7 --- /dev/null +++ b/changelogs/unreleased/8046-Lyndon-Li @@ -0,0 +1 @@ +Data mover micro service backup according to design #7576 \ No newline at end of file diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index aabcd0acc..35f483d92 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -14,16 +14,39 @@ limitations under the License. package datamover import ( + "context" "fmt" + "os" "strings" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/buildinfo" "github.com/vmware-tanzu/velero/pkg/client" + "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" + "github.com/vmware-tanzu/velero/pkg/datamover" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + ctrl "sigs.k8s.io/controller-runtime" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + + ctlcache "sigs.k8s.io/controller-runtime/pkg/cache" + ctlclient "sigs.k8s.io/controller-runtime/pkg/client" ) type dataMoverBackupConfig struct { @@ -52,7 +75,10 @@ func NewBackupCommand(f client.Factory) *cobra.Command { logger.Infof("Starting Velero data-mover backup %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA()) f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name())) - s := newdataMoverBackup(logger, config) + s, err := newdataMoverBackup(logger, f, config) + if err != nil { + exitWithMessage(logger, false, "Failed to create data mover backup, %v", err) + } s.run() }, @@ -73,20 +99,186 @@ func NewBackupCommand(f client.Factory) *cobra.Command { return command } +const ( + // defaultCredentialsDirectory is the path on disk where credential + // files will be written to + defaultCredentialsDirectory = "/tmp/credentials" +) + type dataMoverBackup struct { - logger logrus.FieldLogger - config dataMoverBackupConfig + logger logrus.FieldLogger + ctx context.Context + cancelFunc context.CancelFunc + client ctlclient.Client + cache ctlcache.Cache + namespace string + nodeName string + config dataMoverBackupConfig + kubeClient kubernetes.Interface + dataPathMgr *datapath.Manager } -func newdataMoverBackup(logger logrus.FieldLogger, config dataMoverBackupConfig) *dataMoverBackup { - s := &dataMoverBackup{ - logger: logger, - config: config, +func newdataMoverBackup(logger logrus.FieldLogger, factory client.Factory, config dataMoverBackupConfig) (*dataMoverBackup, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + + clientConfig, err := factory.ClientConfig() + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client config") } - return s + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + + scheme := runtime.NewScheme() + if err := velerov1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add velero v1 scheme") + } + + if err := velerov2alpha1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme") + } + + if err := v1.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add core v1 scheme") + } + + nodeName := os.Getenv("NODE_NAME") + + // use a field selector to filter to only pods scheduled on this node. + cacheOption := ctlcache.Options{ + Scheme: scheme, + ByObject: map[ctlclient.Object]ctlcache.ByObject{ + &v1.Pod{}: { + Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(), + }, + &velerov2alpha1api.DataUpload{}: { + Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), + }, + }, + } + + cli, err := ctlclient.New(clientConfig, ctlclient.Options{ + Scheme: scheme, + }) + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client") + } + + cache, err := ctlcache.New(clientConfig, cacheOption) + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client cache") + } + + s := &dataMoverBackup{ + logger: logger, + ctx: ctx, + cancelFunc: cancelFunc, + client: cli, + cache: cache, + config: config, + namespace: factory.Namespace(), + nodeName: nodeName, + } + + s.kubeClient, err = factory.KubeClient() + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create kube client") + } + + s.dataPathMgr = datapath.NewManager(1) + + return s, nil } +var funcExitWithMessage = exitWithMessage +var funcCreateDataPathService = (*dataMoverBackup).createDataPathService + func (s *dataMoverBackup) run() { - time.Sleep(time.Duration(1<<63 - 1)) + signals.CancelOnShutdown(s.cancelFunc, s.logger) + go func() { + if err := s.cache.Start(s.ctx); err != nil { + s.logger.WithError(err).Warn("error starting cache") + } + }() + + s.runDataPath() +} + +func (s *dataMoverBackup) runDataPath() { + s.logger.Infof("Starting micro service in node %s for du %s", s.nodeName, s.config.duName) + + dpService, err := funcCreateDataPathService(s) + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to create data path service for DataUpload %s: %v", s.config.duName, err) + return + } + + s.logger.Infof("Starting data path service %s", s.config.duName) + + err = dpService.Init() + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err) + return + } + + s.logger.Infof("Running data path service %s", s.config.duName) + + result, err := dpService.RunCancelableDataPath(s.ctx) + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err) + return + } + + s.logger.WithField("du", s.config.duName).Info("Data path service completed") + + dpService.Shutdown() + + s.logger.WithField("du", s.config.duName).Info("Data path service is shut down") + + s.cancelFunc() + + funcExitWithMessage(s.logger, true, result) +} + +var funcNewCredentialFileStore = credentials.NewNamespacedFileStore +var funcNewCredentialSecretStore = credentials.NewNamespacedSecretStore + +func (s *dataMoverBackup) createDataPathService() (dataPathService, error) { + credentialFileStore, err := funcNewCredentialFileStore( + s.client, + s.namespace, + defaultCredentialsDirectory, + filesystem.NewFileSystem(), + ) + if err != nil { + return nil, errors.Wrapf(err, "error to create credential file store") + } + + credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace) + if err != nil { + return nil, errors.Wrapf(err, "error to create credential secret store") + } + + credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} + + duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataUpload{}) + if err != nil { + return nil, errors.Wrap(err, "error to get controller-runtime informer from manager") + } + + repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout) + + return datamover.NewBackupMicroService(s.ctx, s.client, s.kubeClient, s.config.duName, s.namespace, s.nodeName, datapath.AccessPoint{ + ByPath: s.config.volumePath, + VolMode: uploader.PersistentVolumeMode(s.config.volumeMode), + }, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil } diff --git a/pkg/cmd/cli/datamover/backup_test.go b/pkg/cmd/cli/datamover/backup_test.go new file mode 100644 index 000000000..2dd1e681d --- /dev/null +++ b/pkg/cmd/cli/datamover/backup_test.go @@ -0,0 +1,216 @@ +/* +Copyright The Velero Contributors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamover + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + ctlclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" +) + +func fakeCreateDataPathServiceWithErr(_ *dataMoverBackup) (dataPathService, error) { + return nil, errors.New("fake-create-data-path-error") +} + +var frHelper *fakeRunHelper + +func fakeCreateDataPathService(_ *dataMoverBackup) (dataPathService, error) { + return frHelper, nil +} + +type fakeRunHelper struct { + initErr error + runCancelableDataPathErr error + runCancelableDataPathResult string + exitMessage string + succeed bool +} + +func (fr *fakeRunHelper) Init() error { + return fr.initErr +} + +func (fr *fakeRunHelper) RunCancelableDataPath(_ context.Context) (string, error) { + if fr.runCancelableDataPathErr != nil { + return "", fr.runCancelableDataPathErr + } else { + return fr.runCancelableDataPathResult, nil + } +} + +func (fr *fakeRunHelper) Shutdown() { + +} + +func (fr *fakeRunHelper) ExitWithMessage(logger logrus.FieldLogger, succeed bool, message string, a ...any) { + fr.succeed = succeed + fr.exitMessage = fmt.Sprintf(message, a...) +} + +func TestRunDataPath(t *testing.T) { + tests := []struct { + name string + duName string + createDataPathFail bool + initDataPathErr error + runCancelableDataPathErr error + runCancelableDataPathResult string + expectedMessage string + expectedSucceed bool + }{ + { + name: "create data path failed", + duName: "fake-name", + createDataPathFail: true, + expectedMessage: "Failed to create data path service for DataUpload fake-name: fake-create-data-path-error", + }, + { + name: "init data path failed", + duName: "fake-name", + initDataPathErr: errors.New("fake-init-data-path-error"), + expectedMessage: "Failed to init data path service for DataUpload fake-name: fake-init-data-path-error", + }, + { + name: "run data path failed", + duName: "fake-name", + runCancelableDataPathErr: errors.New("fake-run-data-path-error"), + expectedMessage: "Failed to run data path service for DataUpload fake-name: fake-run-data-path-error", + }, + { + name: "succeed", + duName: "fake-name", + runCancelableDataPathResult: "fake-run-data-path-result", + expectedMessage: "fake-run-data-path-result", + expectedSucceed: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + frHelper = &fakeRunHelper{ + initErr: test.initDataPathErr, + runCancelableDataPathErr: test.runCancelableDataPathErr, + runCancelableDataPathResult: test.runCancelableDataPathResult, + } + + if test.createDataPathFail { + funcCreateDataPathService = fakeCreateDataPathServiceWithErr + } else { + funcCreateDataPathService = fakeCreateDataPathService + } + + funcExitWithMessage = frHelper.ExitWithMessage + + s := &dataMoverBackup{ + logger: velerotest.NewLogger(), + cancelFunc: func() {}, + config: dataMoverBackupConfig{ + duName: test.duName, + }, + } + + s.runDataPath() + + assert.Equal(t, test.expectedMessage, frHelper.exitMessage) + assert.Equal(t, test.expectedSucceed, frHelper.succeed) + }) + } +} + +type fakeCreateDataPathServiceHelper struct { + fileStoreErr error + secretStoreErr error +} + +func (fc *fakeCreateDataPathServiceHelper) NewNamespacedFileStore(_ ctlclient.Client, _ string, _ string, _ filesystem.Interface) (credentials.FileStore, error) { + return nil, fc.fileStoreErr +} + +func (fc *fakeCreateDataPathServiceHelper) NewNamespacedSecretStore(_ ctlclient.Client, _ string) (credentials.SecretStore, error) { + return nil, fc.secretStoreErr +} + +func TestCreateDataPathService(t *testing.T) { + tests := []struct { + name string + fileStoreErr error + secretStoreErr error + mockGetInformer bool + getInformerErr error + expectedError string + }{ + { + name: "create credential file store error", + fileStoreErr: errors.New("fake-file-store-error"), + expectedError: "error to create credential file store: fake-file-store-error", + }, + { + name: "create credential secret store", + secretStoreErr: errors.New("fake-secret-store-error"), + expectedError: "error to create credential secret store: fake-secret-store-error", + }, + { + name: "get informer error", + mockGetInformer: true, + getInformerErr: errors.New("fake-get-informer-error"), + expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error", + }, + { + name: "succeed", + mockGetInformer: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fcHelper := &fakeCreateDataPathServiceHelper{ + fileStoreErr: test.fileStoreErr, + secretStoreErr: test.secretStoreErr, + } + + funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore + funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore + + cache := cacheMock.NewCache(t) + if test.mockGetInformer { + cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr) + } + + funcExitWithMessage = frHelper.ExitWithMessage + + s := &dataMoverBackup{ + cache: cache, + } + + _, err := s.createDataPathService() + + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/cmd/cli/datamover/data_mover.go b/pkg/cmd/cli/datamover/data_mover.go index c4400a344..6786f4e7c 100644 --- a/pkg/cmd/cli/datamover/data_mover.go +++ b/pkg/cmd/cli/datamover/data_mover.go @@ -14,6 +14,7 @@ limitations under the License. package datamover import ( + "context" "fmt" "os" @@ -39,6 +40,12 @@ func NewCommand(f client.Factory) *cobra.Command { return command } +type dataPathService interface { + Init() error + RunCancelableDataPath(context.Context) (string, error) + Shutdown() +} + var funcExit = os.Exit var funcCreateFile = os.Create diff --git a/pkg/cmd/cli/datamover/mocks/Cache.go b/pkg/cmd/cli/datamover/mocks/Cache.go new file mode 100644 index 000000000..ac20ae0bd --- /dev/null +++ b/pkg/cmd/cli/datamover/mocks/Cache.go @@ -0,0 +1,231 @@ +// Code generated by mockery v2.39.1. DO NOT EDIT. + +package mocks + +import ( + cache "sigs.k8s.io/controller-runtime/pkg/cache" + client "sigs.k8s.io/controller-runtime/pkg/client" + + context "context" + + mock "github.com/stretchr/testify/mock" + + schema "k8s.io/apimachinery/pkg/runtime/schema" + + types "k8s.io/apimachinery/pkg/types" +) + +// Cache is an autogenerated mock type for the Cache type +type Cache struct { + mock.Mock +} + +// Get provides a mock function with given fields: ctx, key, obj, opts +func (_m *Cache) Get(ctx context.Context, key types.NamespacedName, obj client.Object, opts ...client.GetOption) error { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, key, obj) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, types.NamespacedName, client.Object, ...client.GetOption) error); ok { + r0 = rf(ctx, key, obj, opts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetInformer provides a mock function with given fields: ctx, obj, opts +func (_m *Cache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, obj) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetInformer") + } + + var r0 cache.Informer + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.Object, ...cache.InformerGetOption) (cache.Informer, error)); ok { + return rf(ctx, obj, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, client.Object, ...cache.InformerGetOption) cache.Informer); ok { + r0 = rf(ctx, obj, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Informer) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.Object, ...cache.InformerGetOption) error); ok { + r1 = rf(ctx, obj, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetInformerForKind provides a mock function with given fields: ctx, gvk, opts +func (_m *Cache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, gvk) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetInformerForKind") + } + + var r0 cache.Informer + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, schema.GroupVersionKind, ...cache.InformerGetOption) (cache.Informer, error)); ok { + return rf(ctx, gvk, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, schema.GroupVersionKind, ...cache.InformerGetOption) cache.Informer); ok { + r0 = rf(ctx, gvk, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Informer) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, schema.GroupVersionKind, ...cache.InformerGetOption) error); ok { + r1 = rf(ctx, gvk, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// IndexField provides a mock function with given fields: ctx, obj, field, extractValue +func (_m *Cache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + ret := _m.Called(ctx, obj, field, extractValue) + + if len(ret) == 0 { + panic("no return value specified for IndexField") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, client.Object, string, client.IndexerFunc) error); ok { + r0 = rf(ctx, obj, field, extractValue) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// List provides a mock function with given fields: ctx, list, opts +func (_m *Cache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, list) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, client.ObjectList, ...client.ListOption) error); ok { + r0 = rf(ctx, list, opts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RemoveInformer provides a mock function with given fields: ctx, obj +func (_m *Cache) RemoveInformer(ctx context.Context, obj client.Object) error { + ret := _m.Called(ctx, obj) + + if len(ret) == 0 { + panic("no return value specified for RemoveInformer") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, client.Object) error); ok { + r0 = rf(ctx, obj) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: ctx +func (_m *Cache) Start(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// WaitForCacheSync provides a mock function with given fields: ctx +func (_m *Cache) WaitForCacheSync(ctx context.Context) bool { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for WaitForCacheSync") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// NewCache creates a new instance of Cache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewCache(t interface { + mock.TestingT + Cleanup(func()) +}) *Cache { + mock := &Cache{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go new file mode 100644 index 000000000..2f4fa6e56 --- /dev/null +++ b/pkg/datamover/backup_micro_service.go @@ -0,0 +1,324 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamover + +import ( + "context" + "encoding/json" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + cachetool "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +const ( + dataUploadDownloadRequestor = "snapshot-data-upload-download" +) + +// BackupMicroService process data mover backups inside the backup pod +type BackupMicroService struct { + ctx context.Context + client client.Client + kubeClient kubernetes.Interface + repoEnsurer *repository.Ensurer + credentialGetter *credentials.CredentialGetter + logger logrus.FieldLogger + dataPathMgr *datapath.Manager + eventRecorder kube.EventRecorder + + namespace string + dataUploadName string + dataUpload *velerov2alpha1api.DataUpload + sourceTargetPath datapath.AccessPoint + + resultSignal chan dataPathResult + + duInformer cache.Informer + duHandler cachetool.ResourceEventHandlerRegistration + nodeName string +} + +type dataPathResult struct { + err error + result string +} + +func NewBackupMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, dataUploadName string, namespace string, nodeName string, + sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter, + duInformer cache.Informer, log logrus.FieldLogger) *BackupMicroService { + return &BackupMicroService{ + ctx: ctx, + client: client, + kubeClient: kubeClient, + credentialGetter: cred, + logger: log, + repoEnsurer: repoEnsurer, + dataPathMgr: dataPathMgr, + namespace: namespace, + dataUploadName: dataUploadName, + sourceTargetPath: sourceTargetPath, + nodeName: nodeName, + resultSignal: make(chan dataPathResult), + duInformer: duInformer, + } +} + +func (r *BackupMicroService) Init() error { + r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName) + + handler, err := r.duInformer.AddEventHandler( + cachetool.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj interface{}, newObj interface{}) { + oldDu := oldObj.(*velerov2alpha1api.DataUpload) + newDu := newObj.(*velerov2alpha1api.DataUpload) + + if newDu.Name != r.dataUpload.Name { + return + } + + if newDu.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { + return + } + + if newDu.Spec.Cancel && !oldDu.Spec.Cancel { + r.cancelDataUpload(newDu) + } + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error adding du handler") + } + + r.duHandler = handler + + return err +} + +var waitDuTimeout time.Duration = time.Minute * 2 + +func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { + log := r.logger.WithFields(logrus.Fields{ + "dataupload": r.dataUploadName, + }) + + du := &velerov2alpha1api.DataUpload{} + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitDuTimeout, true, func(ctx context.Context) (bool, error) { + err := r.client.Get(ctx, types.NamespacedName{ + Namespace: r.namespace, + Name: r.dataUploadName, + }, du) + + if apierrors.IsNotFound(err) { + return false, nil + } + + if err != nil { + return true, errors.Wrapf(err, "error to get du %s", r.dataUploadName) + } + + if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { + return true, nil + } else { + return false, nil + } + }) + + if err != nil { + log.WithError(err).Error("Failed to wait du") + return "", errors.Wrap(err, "error waiting for du") + } + + r.dataUpload = du + + log.Info("Run cancelable dataUpload") + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataUploadCompleted, + OnFailed: r.OnDataUploadFailed, + OnCancelled: r.OnDataUploadCancelled, + OnProgress: r.OnDataUploadProgress, + } + + fsBackup, err := r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log) + if err != nil { + return "", errors.Wrap(err, "error to create data path") + } + + log.Debug("Async fs br created") + + if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{ + BSLName: du.Spec.BackupStorageLocation, + SourceNamespace: du.Spec.SourceNamespace, + UploaderType: GetUploaderType(du.Spec.DataMover), + RepositoryType: velerov1api.BackupRepositoryTypeKopia, + RepoIdentifier: "", + RepositoryEnsurer: r.repoEnsurer, + CredentialGetter: r.credentialGetter, + }); err != nil { + return "", errors.Wrap(err, "error to initialize data path") + } + + log.Info("Async fs br init") + + tags := map[string]string{ + velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel], + } + + if err := fsBackup.StartBackup(r.sourceTargetPath, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{ + RealSource: GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC), + ParentSnapshot: "", + ForceFull: false, + Tags: tags, + }); err != nil { + return "", errors.Wrap(err, "error starting data path backup") + } + + log.Info("Async fs backup data path started") + r.eventRecorder.Event(du, false, datapath.EventReasonStarted, "Data path for %s started", du.Name) + + result := "" + select { + case <-ctx.Done(): + err = errors.New("timed out waiting for fs backup to complete") + break + case res := <-r.resultSignal: + err = res.err + result = res.result + break + } + + if err != nil { + log.WithError(err).Error("Async fs backup was not completed") + } + + return result, err +} + +func (r *BackupMicroService) Shutdown() { + r.eventRecorder.Shutdown() + r.closeDataPath(r.ctx, r.dataUploadName) + + if r.duHandler != nil { + if err := r.duInformer.RemoveEventHandler(r.duHandler); err != nil { + r.logger.WithError(err).Warn("Failed to remove pod handler") + } + } +} + +var funcMarshal = json.Marshal + +func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { + defer r.closeDataPath(ctx, duName) + + log := r.logger.WithField("dataupload", duName) + + backupBytes, err := funcMarshal(result.Backup) + if err != nil { + log.WithError(err).Errorf("Failed to marshal backup result %v", result.Backup) + r.resultSignal <- dataPathResult{ + err: errors.Wrapf(err, "Failed to marshal backup result %v", result.Backup), + } + } else { + r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonCompleted, string(backupBytes)) + r.resultSignal <- dataPathResult{ + result: string(backupBytes), + } + } + + log.Info("Async fs backup completed") +} + +func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { + defer r.closeDataPath(ctx, duName) + + log := r.logger.WithField("dataupload", duName) + log.WithError(err).Error("Async fs backup data path failed") + + r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonFailed, "Data path for data upload %s failed, error %v", r.dataUploadName, err) + r.resultSignal <- dataPathResult{ + err: errors.Wrapf(err, "Data path for data upload %s failed", r.dataUploadName), + } +} + +func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { + defer r.closeDataPath(ctx, duName) + + log := r.logger.WithField("dataupload", duName) + log.Warn("Async fs backup data path canceled") + + r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonCancelled, "Data path for data upload %s canceled", duName) + r.resultSignal <- dataPathResult{ + err: errors.New(datapath.ErrCancelled), + } +} + +func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) { + log := r.logger.WithFields(logrus.Fields{ + "dataupload": duName, + }) + + progressBytes, err := funcMarshal(progress) + if err != nil { + log.WithError(err).Errorf("Failed to marshal progress %v", progress) + return + } + + log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes)) + + r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes)) +} + +func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) { + fsBackup := r.dataPathMgr.GetAsyncBR(duName) + if fsBackup != nil { + fsBackup.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(duName) +} + +func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) { + r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled") + + r.eventRecorder.Event(du, false, "Canceling", "Canceing for data upload %s", du.Name) + + fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) + if fsBackup == nil { + r.OnDataUploadCancelled(r.ctx, du.GetNamespace(), du.GetName()) + } else { + fsBackup.Cancel() + } +} diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go new file mode 100644 index 000000000..8fd595e93 --- /dev/null +++ b/pkg/datamover/backup_micro_service_test.go @@ -0,0 +1,441 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamover + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/uploader" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + + clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + + kbclient "sigs.k8s.io/controller-runtime/pkg/client" + + datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" +) + +type backupMsTestHelper struct { + eventReason string + eventMsg string + marshalErr error + marshalBytes []byte + withEvent bool + eventLock sync.Mutex +} + +func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, message string, a ...any) { + bt.eventLock.Lock() + defer bt.eventLock.Unlock() + + bt.withEvent = true + bt.eventReason = reason + bt.eventMsg = fmt.Sprintf(message, a...) +} +func (bt *backupMsTestHelper) Shutdown() {} + +func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) { + if bt.marshalErr != nil { + return nil, bt.marshalErr + } + + return bt.marshalBytes, nil +} + +func (bt *backupMsTestHelper) EventReason() string { + bt.eventLock.Lock() + defer bt.eventLock.Unlock() + + return bt.eventReason +} + +func (bt *backupMsTestHelper) EventMessage() string { + bt.eventLock.Lock() + defer bt.eventLock.Unlock() + + return bt.eventMsg +} + +func TestOnDataUploadFailed(t *testing.T) { + dataUploadName := "fake-data-upload" + bt := &backupMsTestHelper{} + + bs := &BackupMicroService{ + dataUploadName: dataUploadName, + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + expectedErr := "Data path for data upload fake-data-upload failed: fake-error" + expectedEventReason := datapath.EventReasonFailed + expectedEventMsg := "Data path for data upload fake-data-upload failed, error fake-error" + + go bs.OnDataUploadFailed(context.TODO(), velerov1api.DefaultNamespace, dataUploadName, errors.New("fake-error")) + + result := <-bs.resultSignal + assert.EqualError(t, result.err, expectedErr) + assert.Equal(t, expectedEventReason, bt.EventReason()) + assert.Equal(t, expectedEventMsg, bt.EventMessage()) +} + +func TestOnDataUploadCancelled(t *testing.T) { + dataUploadName := "fake-data-upload" + bt := &backupMsTestHelper{} + + bs := &BackupMicroService{ + dataUploadName: dataUploadName, + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + expectedErr := datapath.ErrCancelled + expectedEventReason := datapath.EventReasonCancelled + expectedEventMsg := "Data path for data upload fake-data-upload canceled" + + go bs.OnDataUploadCancelled(context.TODO(), velerov1api.DefaultNamespace, dataUploadName) + + result := <-bs.resultSignal + assert.EqualError(t, result.err, expectedErr) + assert.Equal(t, expectedEventReason, bt.EventReason()) + assert.Equal(t, expectedEventMsg, bt.EventMessage()) +} + +func TestOnDataUploadCompleted(t *testing.T) { + tests := []struct { + name string + expectedErr string + expectedEventReason string + expectedEventMsg string + marshalErr error + marshallStr string + }{ + { + name: "marshal fail", + marshalErr: errors.New("fake-marshal-error"), + expectedErr: "Failed to marshal backup result { false { }}: fake-marshal-error", + }, + { + name: "succeed", + marshallStr: "fake-complete-string", + expectedEventReason: datapath.EventReasonCompleted, + expectedEventMsg: "fake-complete-string", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dataUploadName := "fake-data-upload" + + bt := &backupMsTestHelper{ + marshalErr: test.marshalErr, + marshalBytes: []byte(test.marshallStr), + } + + bs := &BackupMicroService{ + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + funcMarshal = bt.Marshal + + go bs.OnDataUploadCompleted(context.TODO(), velerov1api.DefaultNamespace, dataUploadName, datapath.Result{}) + + result := <-bs.resultSignal + if test.marshalErr != nil { + assert.EqualError(t, result.err, test.expectedErr) + } else { + assert.NoError(t, result.err) + assert.Equal(t, test.expectedEventReason, bt.EventReason()) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + } + }) + } +} + +func TestOnDataUploadProgress(t *testing.T) { + tests := []struct { + name string + expectedErr string + expectedEventReason string + expectedEventMsg string + marshalErr error + marshallStr string + }{ + { + name: "marshal fail", + marshalErr: errors.New("fake-marshal-error"), + expectedErr: "Failed to marshal backup result", + }, + { + name: "succeed", + marshallStr: "fake-progress-string", + expectedEventReason: datapath.EventReasonProgress, + expectedEventMsg: "fake-progress-string", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dataUploadName := "fake-data-upload" + + bt := &backupMsTestHelper{ + marshalErr: test.marshalErr, + marshalBytes: []byte(test.marshallStr), + } + + bs := &BackupMicroService{ + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + logger: velerotest.NewLogger(), + } + + funcMarshal = bt.Marshal + + bs.OnDataUploadProgress(context.TODO(), velerov1api.DefaultNamespace, dataUploadName, &uploader.Progress{}) + + if test.marshalErr != nil { + assert.False(t, bt.withEvent) + } else { + assert.True(t, bt.withEvent) + assert.Equal(t, test.expectedEventReason, bt.EventReason()) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + } + }) + } +} + +func TestCancelDataUpload(t *testing.T) { + tests := []struct { + name string + expectedEventReason string + expectedEventMsg string + expectedErr string + }{ + { + name: "no fs backup", + expectedEventReason: datapath.EventReasonCancelled, + expectedEventMsg: "Data path for data upload fake-data-upload canceled", + expectedErr: datapath.ErrCancelled, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dataUploadName := "fake-data-upload" + du := builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).Result() + + bt := &backupMsTestHelper{} + + bs := &BackupMicroService{ + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + go bs.cancelDataUpload(du) + + result := <-bs.resultSignal + + assert.EqualError(t, result.err, test.expectedErr) + assert.True(t, bt.withEvent) + assert.Equal(t, test.expectedEventReason, bt.EventReason()) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + }) + } +} + +func TestRunCancelableDataPath(t *testing.T) { + dataUploadName := "fake-data-upload" + du := builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).Phase(velerov2alpha1api.DataUploadPhaseNew).Result() + duInProgress := builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName).Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result() + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second) + + tests := []struct { + name string + ctx context.Context + result *dataPathResult + dataPathMgr *datapath.Manager + kubeClientObj []runtime.Object + initErr error + startErr error + dataPathStarted bool + expectedEventMsg string + expectedErr string + }{ + { + name: "no du", + ctx: context.Background(), + expectedErr: "error waiting for du: context deadline exceeded", + }, + { + name: "du not in in-progress", + ctx: context.Background(), + kubeClientObj: []runtime.Object{du}, + expectedErr: "error waiting for du: context deadline exceeded", + }, + { + name: "create data path fail", + ctx: context.Background(), + kubeClientObj: []runtime.Object{duInProgress}, + dataPathMgr: datapath.NewManager(0), + expectedErr: "error to create data path: Concurrent number exceeds", + }, + { + name: "init data path fail", + ctx: context.Background(), + kubeClientObj: []runtime.Object{duInProgress}, + initErr: errors.New("fake-init-error"), + expectedErr: "error to initialize data path: fake-init-error", + }, + { + name: "start data path fail", + ctx: context.Background(), + kubeClientObj: []runtime.Object{duInProgress}, + startErr: errors.New("fake-start-error"), + expectedErr: "error starting data path backup: fake-start-error", + }, + { + name: "data path timeout", + ctx: ctxTimeout, + kubeClientObj: []runtime.Object{duInProgress}, + dataPathStarted: true, + expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName), + expectedErr: "timed out waiting for fs backup to complete", + }, + { + name: "data path returns error", + ctx: context.Background(), + kubeClientObj: []runtime.Object{duInProgress}, + dataPathStarted: true, + result: &dataPathResult{ + err: errors.New("fake-data-path-error"), + }, + expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName), + expectedErr: "fake-data-path-error", + }, + { + name: "succeed", + ctx: context.Background(), + kubeClientObj: []runtime.Object{duInProgress}, + dataPathStarted: true, + result: &dataPathResult{ + result: "fake-succeed-result", + }, + expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName), + }, + } + + scheme := runtime.NewScheme() + velerov2alpha1api.AddToScheme(scheme) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + bt := &backupMsTestHelper{} + + bs := &BackupMicroService{ + namespace: velerov1api.DefaultNamespace, + dataUploadName: dataUploadName, + ctx: context.Background(), + client: fakeClient, + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + if test.ctx != nil { + bs.ctx = test.ctx + } + + if test.dataPathMgr != nil { + bs.dataPathMgr = test.dataPathMgr + } + + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + fsBR := datapathmockes.NewAsyncBR(t) + if test.initErr != nil { + fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr) + } + + if test.startErr != nil { + fsBR.On("Init", mock.Anything, mock.Anything).Return(nil) + fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr) + } + + if test.dataPathStarted { + fsBR.On("Init", mock.Anything, mock.Anything).Return(nil) + fsBR.On("StartBackup", mock.Anything, mock.Anything, mock.Anything).Return(nil) + } + + return fsBR + } + + waitDuTimeout = time.Second + + if test.result != nil { + go func() { + time.Sleep(time.Millisecond * 500) + bs.resultSignal <- *test.result + }() + } + + result, err := bs.RunCancelableDataPath(test.ctx) + + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, test.result.result, result) + } + + if test.expectedEventMsg != "" { + assert.True(t, bt.withEvent) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + } + }) + } + + cancel() +} diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go new file mode 100644 index 000000000..ed941880c --- /dev/null +++ b/pkg/datapath/micro_service_watcher.go @@ -0,0 +1,24 @@ +/* +Copyright The Velero Contributors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +const ( + ErrCancelled = "data path is canceled" + + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" +) diff --git a/pkg/util/kube/event.go b/pkg/util/kube/event.go new file mode 100644 index 000000000..d5a4bb8d2 --- /dev/null +++ b/pkg/util/kube/event.go @@ -0,0 +1,74 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package kube + +import ( + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" +) + +type EventRecorder interface { + Event(object runtime.Object, warning bool, reason string, message string, a ...any) + Shutdown() +} + +type eventRecorder struct { + broadcaster record.EventBroadcaster + recorder record.EventRecorder +} + +func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string) EventRecorder { + res := eventRecorder{} + + res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{ + MaxEvents: 1, + MessageFunc: func(event *v1.Event) string { + return event.Message + }, + }) + + res.broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + res.recorder = res.broadcaster.NewRecorder(scheme, v1.EventSource{ + Component: eventSource, + Host: eventNode, + }) + + return &res +} + +func (er *eventRecorder) Event(object runtime.Object, warning bool, reason string, message string, a ...any) { + eventType := v1.EventTypeNormal + if warning { + eventType = v1.EventTypeWarning + } + + if len(a) > 0 { + er.recorder.Eventf(object, eventType, reason, message, a...) + } else { + er.recorder.Event(object, eventType, reason, message) + } +} + +func (er *eventRecorder) Shutdown() { + // StartEventWatcher doesn't wait for writing all buffered events to API server when Shutdown is called, so have to hardcode a sleep time + time.Sleep(2 * time.Second) + er.broadcaster.Shutdown() +} diff --git a/pkg/util/kube/event_test.go b/pkg/util/kube/event_test.go new file mode 100644 index 000000000..538142596 --- /dev/null +++ b/pkg/util/kube/event_test.go @@ -0,0 +1,62 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + + corev1 "k8s.io/api/core/v1" +) + +func TestEvent(t *testing.T) { + client := fake.NewSimpleClientset() + + scheme := runtime.NewScheme() + err := corev1.AddToScheme(scheme) + require.NoError(t, err) + + recorder := NewEventRecorder(client, scheme, "source-1", "fake-node") + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fake-ns", + Name: "fake-pod", + UID: types.UID("fake-uid"), + }, + Spec: corev1.PodSpec{ + NodeName: "fake-node", + }, + } + + recorder.Event(pod, false, "Progress", "progress-1") + recorder.Event(pod, false, "Progress", "progress-2") + + recorder.Shutdown() + + items, err := client.CoreV1().Events("fake-ns").List(context.Background(), metav1.ListOptions{}) + require.NoError(t, err) + + assert.Len(t, items.Items, 1) +} From 6997b8e39354f0f5ecff94aa98a8c026827e5dd4 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 26 Jul 2024 13:54:02 +0800 Subject: [PATCH 2/2] data mover micro service backup Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/backup.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index 35f483d92..2027b0893 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -207,7 +207,8 @@ func (s *dataMoverBackup) run() { } }() - s.runDataPath() + // TODOOO: call s.runDataPath() + time.Sleep(time.Duration(1<<63 - 1)) } func (s *dataMoverBackup) runDataPath() {