adding block mode to uploader/provider interfaces

Signed-off-by: Shawn Hurley <shawn@hurley.page>
This commit is contained in:
Shawn Hurley
2023-08-02 12:26:01 -04:00
parent 411bd54920
commit 563a16c10f
13 changed files with 132 additions and 23 deletions

View File

@@ -0,0 +1 @@
Add API support for volMode block, only error for now.

View File

@@ -133,9 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(source)
go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@@ -154,8 +155,10 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(target)
go func() {
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs)
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
@@ -169,6 +172,13 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return nil
}
func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
if source.ByBlock != "" {
return uploader.PersistentVolumeBlock
}
return uploader.PersistentVolumeFilesystem
}
// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {

View File

@@ -95,7 +95,7 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
@@ -178,7 +178,7 @@ func TestAsyncRestore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks

View File

@@ -52,7 +52,8 @@ type Callbacks struct {
// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
ByPath string
ByBlock string
}
// AsyncBR is the interface for asynchronous data path methods

View File

@@ -88,10 +88,15 @@ func setupDefaultPolicy() *policy.Tree {
// Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string,
forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
}
if volMode == uploader.PersistentVolumeBlock {
return nil, false, errors.New("unable to handle block storage")
}
dir, err := filepath.Abs(sourcePath)
if err != nil {
return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)

View File

@@ -564,6 +564,7 @@ func TestBackup(t *testing.T) {
isSnapshotSourceError bool
expectedError error
expectedEmpty bool
volMode uploader.PersistentVolumeMode
}
manifest := &snapshot.Manifest{
ID: "test",
@@ -590,10 +591,20 @@ func TestBackup(t *testing.T) {
tags: nil,
expectedError: errors.New("Unable to read dir"),
},
{
name: "Unable to handle block mode",
sourcePath: "/",
tags: nil,
volMode: uploader.PersistentVolumeBlock,
expectedError: errors.New("unable to handle block storage"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
s := injectSnapshotFuncs()
args := []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
@@ -616,9 +627,9 @@ func TestBackup(t *testing.T) {
var snapshotInfo *uploader.SnapshotInfo
var err error
if tc.isEmptyUploader {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
} else {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
}
// Check if the returned error matches the expected error
if tc.expectedError != nil {

View File

@@ -118,6 +118,7 @@ func (kp *kopiaProvider) RunBackup(
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
@@ -127,6 +128,11 @@ func (kp *kopiaProvider) RunBackup(
return "", false, errors.New("path is empty")
}
// For now, error on block mode
if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to currently support block mode")
}
log := kp.log.WithFields(logrus.Fields{
"path": path,
"realSource": realSource,
@@ -153,7 +159,7 @@ func (kp *kopiaProvider) RunBackup(
tags[uploader.SnapshotRequesterTag] = kp.requestorType
tags[uploader.SnapshotUploaderTag] = uploader.KopiaType
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log)
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, tags, log)
if err != nil {
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")
@@ -197,11 +203,17 @@ func (kp *kopiaProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
})
if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to currently support block mode")
}
repoWriter := kopia.NewShimRepo(kp.bkRepo)
progress := new(kopia.Progress)
progress.InitThrottle(restoreProgressCheckInterval)

View File

@@ -68,35 +68,47 @@ func TestRunBackup(t *testing.T) {
testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
volMode uploader.PersistentVolumeMode
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
},
{
name: "error on vol mode",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, nil
},
volMode: uploader.PersistentVolumeBlock,
notError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", &updater)
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
@@ -115,6 +127,7 @@ func TestRunRestore(t *testing.T) {
name string
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
notError bool
volMode uploader.PersistentVolumeMode
}{
{
name: "normal restore",
@@ -130,12 +143,23 @@ func TestRunRestore(t *testing.T) {
},
notError: false,
},
{
name: "failed to restore block mode",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, errors.New("failed to restore")
},
volMode: uploader.PersistentVolumeBlock,
notError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", &updater)
err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, &updater)
if tc.notError {
assert.NoError(t, err)
} else {

View File

@@ -30,8 +30,8 @@ func (_m *Provider) Close(ctx context.Context) error {
}
// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, updater)
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, updater)
var r0 string
var r1 bool
@@ -61,8 +61,8 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin
}
// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, updater
func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, updater uploader.ProgressUpdater) error {
ret := _m.Called(ctx, snapshotID, volumePath, updater)
func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) error {
ret := _m.Called(ctx, snapshotID, volumePath, volMode, updater)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.ProgressUpdater) error); ok {

View File

@@ -48,6 +48,7 @@ type Provider interface {
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party
@@ -55,6 +56,7 @@ type Provider interface {
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error
// Close which will close related repository
Close(ctx context.Context) error

View File

@@ -121,6 +121,7 @@ func (rp *resticProvider) RunBackup(
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
@@ -134,6 +135,10 @@ func (rp *resticProvider) RunBackup(
return "", false, errors.New("real source is not empty, this is not supported by restic uploader")
}
if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to support block mode")
}
log := rp.log.WithFields(logrus.Fields{
"path": path,
"parentSnapshot": parentSnapshot,
@@ -179,6 +184,7 @@ func (rp *resticProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error {
if updater == nil {
return errors.New("Need to initial backup progress updater first")
@@ -188,6 +194,10 @@ func (rp *resticProvider) RunRestore(
"volumePath": volumePath,
})
if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to support block mode")
}
restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
restoreCmd.Env = rp.cmdEnv
restoreCmd.CACertFile = rp.caCertFile

View File

@@ -45,6 +45,7 @@ func TestResticRunBackup(t *testing.T) {
nilUpdater bool
parentSnapshot string
rp *resticProvider
volMode uploader.PersistentVolumeMode
hookBackupFunc func(string, string, string, map[string]string) *restic.Command
hookResticBackupFunc func(*restic.Command, logrus.FieldLogger, uploader.ProgressUpdater) (string, string, error)
hookResticGetSnapshotFunc func(string, string, map[string]string) *restic.Command
@@ -117,6 +118,14 @@ func TestResticRunBackup(t *testing.T) {
return strings.Contains(err.Error(), "failed to get snapshot id")
},
},
{
name: "failed to use block mode",
rp: &resticProvider{log: logrus.New(), extraFlags: []string{"testFlags"}},
volMode: uploader.PersistentVolumeBlock,
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "unable to support block mode")
},
},
}
for _, tc := range testCases {
@@ -135,11 +144,14 @@ func TestResticRunBackup(t *testing.T) {
if tc.hookResticGetSnapshotIDFunc != nil {
resticGetSnapshotIDFunc = tc.hookResticGetSnapshotIDFunc
}
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, &updater)
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, &updater)
} else {
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, nil)
_, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)
@@ -158,6 +170,7 @@ func TestResticRunRestore(t *testing.T) {
nilUpdater bool
hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command
errorHandleFunc func(err error) bool
volMode uploader.PersistentVolumeMode
}{
{
name: "wrong restic execute command",
@@ -187,17 +200,28 @@ func TestResticRunRestore(t *testing.T) {
return strings.Contains(err.Error(), "error running command")
},
},
{
name: "error block volume mode",
rp: &resticProvider{log: logrus.New()},
errorHandleFunc: func(err error) bool {
return strings.Contains(err.Error(), "unable to support block mode")
},
volMode: uploader.PersistentVolumeBlock,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resticRestoreCMDFunc = tc.hookResticRestoreFunc
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
var err error
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
err = tc.rp.RunRestore(context.Background(), "", "var", &updater)
err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, &updater)
} else {
err = tc.rp.RunRestore(context.Background(), "", "var", nil)
err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)

View File

@@ -28,6 +28,15 @@ const (
SnapshotUploaderTag = "snapshot-uploader"
)
type PersistentVolumeMode string
const (
// PersistentVolumeBlock means the volume will not be formatted with a filesystem and will remain a raw block device.
PersistentVolumeBlock PersistentVolumeMode = "Block"
// PersistentVolumeFilesystem means the volume will be or is formatted with a filesystem.
PersistentVolumeFilesystem PersistentVolumeMode = "Filesystem"
)
// ValidateUploaderType validates if the input param is a valid uploader type.
// It will return an error if it's invalid.
func ValidateUploaderType(t string) error {