add shared generic data path

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2023-04-27 11:19:03 +08:00
parent a8a17d725a
commit 623da51494
22 changed files with 1327 additions and 325 deletions

View File

@@ -94,7 +94,7 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn
// Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
}
@@ -122,7 +122,7 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
}
kopiaCtx := logging.SetupKopiaLog(ctx, log)
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, forceFull, parentSnapshot, tags, log, "Kopia Uploader")
if err != nil {
return nil, false, err
}
@@ -170,7 +170,9 @@ func SnapshotSource(
u SnapshotUploader,
sourceInfo snapshot.SourceInfo,
rootDir fs.Entry,
forceFull bool,
parentSnapshot string,
snapshotTags map[string]string,
log logrus.FieldLogger,
description string,
) (string, int64, error) {
@@ -178,21 +180,24 @@ func SnapshotSource(
snapshotStartTime := time.Now()
var previous []*snapshot.Manifest
if parentSnapshot != "" {
mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot))
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot)
}
if !forceFull {
if parentSnapshot != "" {
mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot))
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot)
}
previous = append(previous, mani)
} else {
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil)
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo)
}
previous = append(previous, mani)
} else {
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil)
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo)
}
previous = pre
previous = pre
}
}
var manifest *snapshot.Manifest
if err := setupDefaultPolicy(ctx, rep, sourceInfo); err != nil {
return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo)
@@ -208,6 +213,8 @@ func SnapshotSource(
return "", 0, errors.Wrapf(err, "Failed to upload the kopia snapshot for si %v", sourceInfo)
}
manifest.Tags = snapshotTags
manifest.Description = description
if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil {
@@ -247,7 +254,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
// findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including
// last complete snapshot following it.
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, noLaterThan *time.Time) ([]*snapshot.Manifest, error) {
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *time.Time) ([]*snapshot.Manifest, error) {
man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil {
return nil, err
@@ -257,6 +264,15 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
var result []*snapshot.Manifest
for _, p := range man {
requestor, found := p.Tags[uploader.SnapshotRequestorTag]
if !found {
continue
}
if requestor != snapshotTags[uploader.SnapshotRequestorTag] {
continue
}
if noLaterThan != nil && p.StartTime.After(*noLaterThan) {
continue
}

View File

@@ -186,7 +186,7 @@ func TestSnapshotSource(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s := injectSnapshotFuncs()
MockFuncs(s, tc.args)
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource")
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, false, "/", nil, log, "TestSnapshotSource")
if tc.notError {
assert.NoError(t, err)
} else {

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
@@ -41,21 +42,25 @@ var RestoreFunc = kopia.Restore
// kopiaProvider recorded info related with kopiaProvider
type kopiaProvider struct {
bkRepo udmrepo.BackupRepo
credGetter *credentials.CredentialGetter
log logrus.FieldLogger
requestorType string
bkRepo udmrepo.BackupRepo
credGetter *credentials.CredentialGetter
log logrus.FieldLogger
canceling int32
}
// NewKopiaUploaderProvider initialized with open or create a repository
func NewKopiaUploaderProvider(
requestorType string,
ctx context.Context,
credGetter *credentials.CredentialGetter,
backupRepo *velerov1api.BackupRepository,
log logrus.FieldLogger,
) (Provider, error) {
kp := &kopiaProvider{
log: log,
credGetter: credGetter,
requestorType: requestorType,
log: log,
credGetter: credGetter,
}
//repoUID which is used to generate kopia repository config with unique directory path
repoUID := string(backupRepo.GetUID())
@@ -85,6 +90,8 @@ func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struc
kp.log.Infof("Action finished")
return
case <-ctx.Done():
atomic.StoreInt32(&kp.canceling, 1)
if uploader != nil {
uploader.Cancel()
kp.log.Infof("Backup is been canceled")
@@ -107,6 +114,7 @@ func (kp *kopiaProvider) RunBackup(
ctx context.Context,
path string,
tags map[string]string,
forceFull bool,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
@@ -132,9 +140,19 @@ func (kp *kopiaProvider) RunBackup(
close(quit)
}()
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
if tags == nil {
tags = make(map[string]string)
}
tags[uploader.SnapshotRequestorTag] = kp.requestorType
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, forceFull, parentSnapshot, tags, log)
if err != nil {
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")
return "", false, ErrorCanceled
} else {
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
}
} else if isSnapshotEmpty {
log.Debugf("Kopia backup got empty dir with path %s", path)
return "", true, nil
@@ -177,28 +195,33 @@ func (kp *kopiaProvider) RunRestore(
"volumePath": volumePath,
})
repoWriter := kopia.NewShimRepo(kp.bkRepo)
prorgess := new(kopia.Progress)
prorgess.InitThrottle(restoreProgressCheckInterval)
prorgess.Updater = updater
progress := new(kopia.Progress)
progress.InitThrottle(restoreProgressCheckInterval)
progress.Updater = updater
restoreCancel := make(chan struct{})
quit := make(chan struct{})
log.Info("Starting restore")
go kp.CheckContext(ctx, quit, restoreCancel, nil)
defer func() {
if restoreCancel != nil {
close(restoreCancel)
}
close(quit)
}()
size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, restoreCancel)
go kp.CheckContext(ctx, quit, restoreCancel, nil)
// We use the cancel channel to control the restore cancel, so don't pass a context with cancel to Kopia restore.
// Otherwise, Kopia restore will not response to the cancel control but return an arbitrary error.
// Kopia restore cancel is not designed as well as Kopia backup which uses the context to control backup cancel all the way.
size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, log, restoreCancel)
if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")
}
if atomic.LoadInt32(&kp.canceling) == 1 {
log.Error("Kopia restore is canceled")
return ErrorCanceled
}
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
updater.UpdateProgress(&uploader.Progress{
TotalBytes: size,

View File

@@ -40,26 +40,26 @@ func TestRunBackup(t *testing.T) {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()}
testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
@@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
_, _, err := kp.RunBackup(context.Background(), "var", nil, false, "", &updater)
if tc.notError {
assert.NoError(t, err)
} else {

View File

@@ -0,0 +1,90 @@
// Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
uploader "github.com/vmware-tanzu/velero/pkg/uploader"
)
// Provider is an autogenerated mock type for the Provider type
type Provider struct {
mock.Mock
}
// Close provides a mock function with given fields: ctx
func (_m *Provider) Close(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// RunBackup provides a mock function with given fields: ctx, path, tags, forceFull, parentSnapshot, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) {
ret := _m.Called(ctx, path, tags, forceFull, parentSnapshot, updater)
var r0 string
var r1 bool
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok {
return rf(ctx, path, tags, forceFull, parentSnapshot, updater)
}
if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok {
r0 = rf(ctx, path, tags, forceFull, parentSnapshot, updater)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok {
r1 = rf(ctx, path, tags, forceFull, parentSnapshot, updater)
} else {
r1 = ret.Get(1).(bool)
}
if rf, ok := ret.Get(2).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok {
r2 = rf(ctx, path, tags, forceFull, parentSnapshot, updater)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, updater
func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, updater uploader.ProgressUpdater) error {
ret := _m.Called(ctx, snapshotID, volumePath, updater)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.ProgressUpdater) error); ok {
r0 = rf(ctx, snapshotID, volumePath, updater)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewProvider interface {
mock.TestingT
Cleanup(func())
}
// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewProvider(t mockConstructorTestingTNewProvider) *Provider {
mock := &Provider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -29,14 +29,14 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)
const restoreProgressCheckInterval = 10 * time.Second
const backupProgressCheckInterval = 10 * time.Second
var ErrorCanceled error = errors.New("uploader is canceled")
// Provider which is designed for one pod volume to do the backup or restore
type Provider interface {
// RunBackup which will do backup for one specific volume and return snapshotID, isSnapshotEmpty, error
@@ -45,6 +45,7 @@ type Provider interface {
ctx context.Context,
path string,
tags map[string]string,
forceFull bool,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, bool, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
@@ -63,6 +64,7 @@ func NewUploaderProvider(
ctx context.Context,
client client.Client,
uploaderType string,
requestorType string,
repoIdentifier string,
bsl *velerov1api.BackupStorageLocation,
backupRepo *velerov1api.BackupRepository,
@@ -70,20 +72,16 @@ func NewUploaderProvider(
repoKeySelector *v1.SecretKeySelector,
log logrus.FieldLogger,
) (Provider, error) {
if requestorType == "" {
return nil, errors.New("requestor type is empty")
}
if credGetter.FromFile == nil {
return nil, errors.New("uninitialized FileStore credentail is not supported")
}
if uploaderType == uploader.KopiaType {
// We use the hardcode repositoryType velerov1api.BackupRepositoryTypeKopia for now, because we have only one implementation of unified repo.
// TODO: post v1.10, replace the hardcode. In future, when we have multiple implementations of Unified Repo (besides Kopia), we will add the
// repositoryType to BSL, because by then, we are not able to hardcode the repositoryType to BackupRepositoryTypeKopia for Unified Repo.
if err := provider.NewUnifiedRepoProvider(*credGetter, velerov1api.BackupRepositoryTypeKopia, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
return nil, errors.Wrap(err, "failed to connect repository")
}
return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log)
return NewKopiaUploaderProvider(requestorType, ctx, credGetter, backupRepo, log)
} else {
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
}
if err := provider.NewResticRepositoryProvider(credGetter.FromFile, filesystem.NewFileSystem(), log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
return nil, errors.Wrap(err, "failed to connect repository")
}
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
}

View File

@@ -113,6 +113,7 @@ func (rp *resticProvider) RunBackup(
ctx context.Context,
path string,
tags map[string]string,
forceFull bool,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {

View File

@@ -64,7 +64,7 @@ func TestResticRunBackup(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ResticBackupCMDFunc = tc.hookBackupFunc
_, _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater)
_, _, err := rp.RunBackup(context.Background(), "var", nil, false, "", &updater)
rp.log.Infof("test name %v error %v", tc.name, err)
require.Equal(t, true, tc.errorHandleFunc(err))
})

View File

@@ -22,8 +22,9 @@ import (
)
const (
ResticType = "restic"
KopiaType = "kopia"
ResticType = "restic"
KopiaType = "kopia"
SnapshotRequestorTag = "snapshot-requestor"
)
// ValidateUploaderType validates if the input param is a valid uploader type.