diff --git a/changelogs/unreleased/5221-qiuming-best b/changelogs/unreleased/5221-qiuming-best new file mode 100644 index 000000000..2de71e91a --- /dev/null +++ b/changelogs/unreleased/5221-qiuming-best @@ -0,0 +1 @@ +Uploader Implementation: Kopia backup and restore diff --git a/go.sum b/go.sum index 58f63b5f2..9ec412ef4 100644 --- a/go.sum +++ b/go.sum @@ -547,6 +547,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0 h1:nHHjmvjitIiyPlUHk/ofpgvBcNcawJLtf4PYHORLjAA= github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0/go.mod h1:YBCo4DoEeDndqvAn6eeu0vWM7QdXmHEeI9cFWplmBys= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= diff --git a/pkg/apis/velero/v1/pod_volume_operation_progress.go b/pkg/apis/velero/v1/pod_volume_operation_progress.go index ceb67a87e..4461a7e7a 100644 --- a/pkg/apis/velero/v1/pod_volume_operation_progress.go +++ b/pkg/apis/velero/v1/pod_volume_operation_progress.go @@ -16,6 +16,13 @@ limitations under the License. package v1 +import ( + "context" + + "github.com/sirupsen/logrus" + "sigs.k8s.io/controller-runtime/pkg/client" +) + // PodVolumeOperationProgress represents the progress of a // PodVolumeBackup/Restore (restic) operation type PodVolumeOperationProgress struct { @@ -25,3 +32,51 @@ type PodVolumeOperationProgress struct { // +optional BytesDone int64 `json:"bytesDone,omitempty"` } + +type BackupProgressUpdater struct { + pvb *PodVolumeBackup + log logrus.FieldLogger + ctx context.Context + cli client.Client +} + +type RestoreProgressUpdater struct { + pvr *PodVolumeRestore + log logrus.FieldLogger + ctx context.Context + cli client.Client +} + +func NewBackupProgressUpdater(pvb *PodVolumeBackup, log logrus.FieldLogger, ctx context.Context, cli client.Client) *BackupProgressUpdater { + return &BackupProgressUpdater{pvb, log, ctx, cli} +} + +//UpdateProgress which implement ProgressUpdater to update pvb progress status +func (b *BackupProgressUpdater) UpdateProgress(p *UploaderProgress) { + original := b.pvb.DeepCopy() + b.pvb.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} + if b.cli == nil { + b.log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume) + return + } + if err := b.cli.Patch(b.ctx, b.pvb, client.MergeFrom(original)); err != nil { + b.log.Errorf("update backup pod %s volume %s progress with %v", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume, err) + } +} + +func NewRestoreProgressUpdater(pvr *PodVolumeRestore, log logrus.FieldLogger, ctx context.Context, cli client.Client) *RestoreProgressUpdater { + return &RestoreProgressUpdater{pvr, log, ctx, cli} +} + +//UpdateProgress which implement ProgressUpdater to update update pvb progress status +func (r *RestoreProgressUpdater) UpdateProgress(p *UploaderProgress) { + original := r.pvr.DeepCopy() + r.pvr.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} + if r.cli == nil { + r.log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume) + return + } + if err := r.cli.Patch(r.ctx, r.pvr, client.MergeFrom(original)); err != nil { + r.log.Errorf("update restore pod %s volume %s progress with %v", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume, err) + } +} diff --git a/pkg/apis/velero/v1/progress.go b/pkg/apis/velero/v1/progress.go new file mode 100644 index 000000000..cf5d42b1d --- /dev/null +++ b/pkg/apis/velero/v1/progress.go @@ -0,0 +1,26 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +type UploaderProgress struct { + TotalBytes int64 `json:"totalBytes,omitempty"` + BytesDone int64 `json:"doneBytes,omitempty"` +} + +type ProgressUpdater interface { + UpdateProgress(p *UploaderProgress) +} diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index abec6d601..3b2118926 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -264,18 +264,6 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l return mostRecentPVB.Status.SnapshotID } -// updateBackupProgressFunc returns a func that takes progress info and patches -// the PVB with the new progress. -func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { - return func(progress velerov1api.PodVolumeOperationProgress) { - original := pvb.DeepCopy() - pvb.Status.Progress = progress - if err := r.Client.Patch(context.Background(), pvb, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error update progress") - } - } -} - func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 2b81d363a..70cf1a53d 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -317,15 +317,3 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve return nil } - -// updateRestoreProgressFunc returns a func that takes progress info and patches -// the PVR with the new progress -func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { - return func(progress velerov1api.PodVolumeOperationProgress) { - original := req.DeepCopy() - req.Status.Progress = progress - if err := c.Patch(context.Background(), req, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update PodVolumeRestore progress") - } - } -} diff --git a/pkg/repository/mocks/repository_writer.go b/pkg/repository/mocks/repository_writer.go new file mode 100644 index 000000000..c3e9964be --- /dev/null +++ b/pkg/repository/mocks/repository_writer.go @@ -0,0 +1,349 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + context "context" + + index "github.com/kopia/kopia/repo/content/index" + manifest "github.com/kopia/kopia/repo/manifest" + + mock "github.com/stretchr/testify/mock" + + object "github.com/kopia/kopia/repo/object" + + repo "github.com/kopia/kopia/repo" + + time "time" +) + +// RepositoryWriter is an autogenerated mock type for the RepositoryWriter type +type RepositoryWriter struct { + mock.Mock +} + +// ClientOptions provides a mock function with given fields: +func (_m *RepositoryWriter) ClientOptions() repo.ClientOptions { + ret := _m.Called() + + var r0 repo.ClientOptions + if rf, ok := ret.Get(0).(func() repo.ClientOptions); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(repo.ClientOptions) + } + + return r0 +} + +// Close provides a mock function with given fields: ctx +func (_m *RepositoryWriter) Close(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ContentInfo provides a mock function with given fields: ctx, contentID +func (_m *RepositoryWriter) ContentInfo(ctx context.Context, contentID index.ID) (index.Info, error) { + ret := _m.Called(ctx, contentID) + + var r0 index.Info + if rf, ok := ret.Get(0).(func(context.Context, index.ID) index.Info); ok { + r0 = rf(ctx, contentID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(index.Info) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, index.ID) error); ok { + r1 = rf(ctx, contentID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteManifest provides a mock function with given fields: ctx, id +func (_m *RepositoryWriter) DeleteManifest(ctx context.Context, id manifest.ID) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, manifest.ID) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// FindManifests provides a mock function with given fields: ctx, labels +func (_m *RepositoryWriter) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) { + ret := _m.Called(ctx, labels) + + var r0 []*manifest.EntryMetadata + if rf, ok := ret.Get(0).(func(context.Context, map[string]string) []*manifest.EntryMetadata); ok { + r0 = rf(ctx, labels) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*manifest.EntryMetadata) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, map[string]string) error); ok { + r1 = rf(ctx, labels) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Flush provides a mock function with given fields: ctx +func (_m *RepositoryWriter) Flush(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetManifest provides a mock function with given fields: ctx, id, data +func (_m *RepositoryWriter) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) { + ret := _m.Called(ctx, id, data) + + var r0 *manifest.EntryMetadata + if rf, ok := ret.Get(0).(func(context.Context, manifest.ID, interface{}) *manifest.EntryMetadata); ok { + r0 = rf(ctx, id, data) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*manifest.EntryMetadata) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, manifest.ID, interface{}) error); ok { + r1 = rf(ctx, id, data) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewObjectWriter provides a mock function with given fields: ctx, opt +func (_m *RepositoryWriter) NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer { + ret := _m.Called(ctx, opt) + + var r0 object.Writer + if rf, ok := ret.Get(0).(func(context.Context, object.WriterOptions) object.Writer); ok { + r0 = rf(ctx, opt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(object.Writer) + } + } + + return r0 +} + +// NewWriter provides a mock function with given fields: ctx, opt +func (_m *RepositoryWriter) NewWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.RepositoryWriter, error) { + ret := _m.Called(ctx, opt) + + var r0 context.Context + if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok { + r0 = rf(ctx, opt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + var r1 repo.RepositoryWriter + if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.RepositoryWriter); ok { + r1 = rf(ctx, opt) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(repo.RepositoryWriter) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok { + r2 = rf(ctx, opt) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// OpenObject provides a mock function with given fields: ctx, id +func (_m *RepositoryWriter) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) { + ret := _m.Called(ctx, id) + + var r0 object.Reader + if rf, ok := ret.Get(0).(func(context.Context, object.ID) object.Reader); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(object.Reader) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PrefetchContents provides a mock function with given fields: ctx, contentIDs, hint +func (_m *RepositoryWriter) PrefetchContents(ctx context.Context, contentIDs []index.ID, hint string) []index.ID { + ret := _m.Called(ctx, contentIDs, hint) + + var r0 []index.ID + if rf, ok := ret.Get(0).(func(context.Context, []index.ID, string) []index.ID); ok { + r0 = rf(ctx, contentIDs, hint) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]index.ID) + } + } + + return r0 +} + +// PrefetchObjects provides a mock function with given fields: ctx, objectIDs, hint +func (_m *RepositoryWriter) PrefetchObjects(ctx context.Context, objectIDs []object.ID, hint string) ([]index.ID, error) { + ret := _m.Called(ctx, objectIDs, hint) + + var r0 []index.ID + if rf, ok := ret.Get(0).(func(context.Context, []object.ID, string) []index.ID); ok { + r0 = rf(ctx, objectIDs, hint) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]index.ID) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, []object.ID, string) error); ok { + r1 = rf(ctx, objectIDs, hint) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PutManifest provides a mock function with given fields: ctx, labels, payload +func (_m *RepositoryWriter) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) { + ret := _m.Called(ctx, labels, payload) + + var r0 manifest.ID + if rf, ok := ret.Get(0).(func(context.Context, map[string]string, interface{}) manifest.ID); ok { + r0 = rf(ctx, labels, payload) + } else { + r0 = ret.Get(0).(manifest.ID) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, map[string]string, interface{}) error); ok { + r1 = rf(ctx, labels, payload) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Refresh provides a mock function with given fields: ctx +func (_m *RepositoryWriter) Refresh(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Time provides a mock function with given fields: +func (_m *RepositoryWriter) Time() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} + +// UpdateDescription provides a mock function with given fields: d +func (_m *RepositoryWriter) UpdateDescription(d string) { + _m.Called(d) +} + +// VerifyObject provides a mock function with given fields: ctx, id +func (_m *RepositoryWriter) VerifyObject(ctx context.Context, id object.ID) ([]index.ID, error) { + ret := _m.Called(ctx, id) + + var r0 []index.ID + if rf, ok := ret.Get(0).(func(context.Context, object.ID) []index.ID); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]index.ID) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/uploader/kopia/progress.go b/pkg/uploader/kopia/progress.go index 050d190dc..7da518eb5 100644 --- a/pkg/uploader/kopia/progress.go +++ b/pkg/uploader/kopia/progress.go @@ -20,7 +20,7 @@ import ( "sync/atomic" "time" - "github.com/vmware-tanzu/velero/pkg/uploader" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" ) //Throttle throttles controlle the interval of output result @@ -60,9 +60,9 @@ type KopiaProgress struct { estimatedFileCount int32 // +checklocksignore the total count of files to be processed estimatedTotalBytes int64 // +checklocksignore the total size of files to be processed // +checkatomic - processedBytes int64 // which statistic all bytes has been processed currently - outputThrottle Throttle // which control the frequency of update progress - UpFunc func(uploader.UploaderProgress) //which called by UpdateProgress func, it is used to update pvb or pvr status + processedBytes int64 // which statistic all bytes has been processed currently + outputThrottle Throttle // which control the frequency of update progress + Updater velerov1api.ProgressUpdater //which the kopia progress will call the UpdateProgress, the third party will implement the interface to update progress } //UploadedBytes the total bytes has uploaded currently @@ -93,10 +93,7 @@ func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) { //UpdateProgress which called by UpdateProgress func, it is used to update pvb or pvr status func (p *KopiaProgress) UpdateProgress() { if p.outputThrottle.ShouldOutput() { - p.UpFunc(uploader.UploaderProgress{ - TotalBytes: atomic.LoadInt64(&p.estimatedTotalBytes), - BytesDone: atomic.LoadInt64(&p.processedBytes), - }) + p.Updater.UpdateProgress(&velerov1api.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes}) } } diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go new file mode 100644 index 000000000..5b6e338b4 --- /dev/null +++ b/pkg/uploader/kopia/snapshot.go @@ -0,0 +1,286 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import ( + "context" + "fmt" + "math" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/sirupsen/logrus" + + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service" + "github.com/vmware-tanzu/velero/pkg/uploader" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/localfs" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" + "github.com/kopia/kopia/snapshot/restore" + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/pkg/errors" +) + +//All function mainly used to make testing more convenient +var treeForSourceFunc = policy.TreeForSource +var applyRetentionPolicyFunc = policy.ApplyRetentionPolicy +var setPolicyFunc = policy.SetPolicy +var saveSnapshotFunc = snapshot.SaveSnapshot +var loadSnapshotFunc = snapshot.LoadSnapshot + +//SnapshotUploader which mainly used for UT test that could overwrite Upload interface +type SnapshotUploader interface { + Upload( + ctx context.Context, + source fs.Entry, + policyTree *policy.Tree, + sourceInfo snapshot.SourceInfo, + previousManifests ...*snapshot.Manifest, + ) (*snapshot.Manifest, error) +} + +func newOptionalInt(b policy.OptionalInt) *policy.OptionalInt { + return &b +} + +//setupDefaultPolicy set default policy for kopia +func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo) error { + return setPolicyFunc(ctx, rep, sourceInfo, &policy.Policy{ + RetentionPolicy: policy.RetentionPolicy{ + KeepLatest: newOptionalInt(math.MaxInt32), + }, + CompressionPolicy: policy.CompressionPolicy{ + CompressorName: "none", + }, + UploadPolicy: policy.UploadPolicy{ + MaxParallelFileReads: newOptionalInt(policy.OptionalInt(runtime.NumCPU())), + }, + SchedulingPolicy: policy.SchedulingPolicy{ + Manual: true, + }, + }) +} + +//Backup backup specific sourcePath and update progress +func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, + parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { + if fsUploader == nil { + return nil, fmt.Errorf("get empty kopia uploader") + } + dir, err := filepath.Abs(sourcePath) + if err != nil { + return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) + } + + sourceInfo := snapshot.SourceInfo{ + Path: filepath.Clean(dir), + } + sourceInfo.UserName, sourceInfo.Host = service.GetRepoUser() + rootDir, err := getLocalFSEntry(sourceInfo.Path) + if err != nil { + return nil, errors.Wrap(err, "Unable to get local filesystem entry") + } + snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader") + if err != nil { + return nil, err + } + + snapshotInfo := &uploader.SnapshotInfo{ + ID: snapID, + Size: snapshotSize, + } + + return snapshotInfo, nil +} + +func getLocalFSEntry(path0 string) (fs.Entry, error) { + path, err := resolveSymlink(path0) + if err != nil { + return nil, errors.Wrap(err, "resolveSymlink") + } + + e, err := localfs.NewEntry(path) + if err != nil { + return nil, errors.Wrap(err, "can't get local fs entry") + } + + return e, nil +} + +//resolveSymlink returns the path name after the evaluation of any symbolic links +func resolveSymlink(path string) (string, error) { + st, err := os.Lstat(path) + if err != nil { + return "", errors.Wrap(err, "stat") + } + + if (st.Mode() & os.ModeSymlink) == 0 { + return path, nil + } + + return filepath.EvalSymlinks(path) +} + +//SnapshotSource which setup policy for snapshot, upload snapshot, update progress +func SnapshotSource( + ctx context.Context, + rep repo.RepositoryWriter, + u SnapshotUploader, + sourceInfo snapshot.SourceInfo, + rootDir fs.Entry, + parentSnapshot string, + log logrus.FieldLogger, + description string, +) (string, int64, error) { + log.Info("Start to snapshot...") + snapshotStartTime := time.Now() + + var previous []*snapshot.Manifest + if parentSnapshot != "" { + mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot)) + if err != nil { + return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot) + } + + previous = append(previous, mani) + } else { + pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil) + if err != nil { + return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo) + } + + previous = pre + } + var manifest *snapshot.Manifest + if err := setupDefaultPolicy(ctx, rep, sourceInfo); err != nil { + return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo) + } + + policyTree, err := treeForSourceFunc(ctx, rep, sourceInfo) + if err != nil { + return "", 0, errors.Wrapf(err, "unable to create policy getter for si %v", sourceInfo) + } + + manifest, err = u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...) + if err != nil { + return "", 0, errors.Wrapf(err, "Failed to upload the kopia snapshot for si %v", sourceInfo) + } + + manifest.Description = description + + if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil { + return "", 0, errors.Wrapf(err, "Failed to save kopia manifest %v", manifest.ID) + } + _, err = applyRetentionPolicyFunc(ctx, rep, sourceInfo, true) + if err != nil { + return "", 0, errors.Wrapf(err, "Failed to apply kopia retention policy for si %v", sourceInfo) + } + if err = rep.Flush(ctx); err != nil { + return "", 0, errors.Wrapf(err, "Failed to flush kopia repository") + } + log.Infof("Created snapshot with root %v and ID %v in %v", manifest.RootObjectID(), manifest.ID, time.Since(snapshotStartTime).Truncate(time.Second)) + return reportSnapshotStatus(manifest) +} + +func reportSnapshotStatus(manifest *snapshot.Manifest) (string, int64, error) { + manifestID := manifest.ID + snapSize := manifest.Stats.TotalFileSize + + var errs []string + if ds := manifest.RootEntry.DirSummary; ds != nil { + for _, ent := range ds.FailedEntries { + errs = append(errs, ent.Error) + } + } + if len(errs) != 0 { + return "", 0, errors.New(strings.Join(errs, "\n")) + } + + return string(manifestID), snapSize, nil +} + +// findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including +// last complete snapshot following it. +func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, noLaterThan *time.Time) ([]*snapshot.Manifest, error) { + man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo) + if err != nil { + return nil, err + } + + var previousComplete *snapshot.Manifest + var result []*snapshot.Manifest + + for _, p := range man { + if noLaterThan != nil && p.StartTime.After(*noLaterThan) { + continue + } + + if p.IncompleteReason == "" && (previousComplete == nil || p.StartTime.After(previousComplete.StartTime)) { + previousComplete = p + } + } + + if previousComplete != nil { + result = append(result, previousComplete) + } + + return result, nil +} + +//Restore restore specific sourcePath with given snapshotID and update progress +func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + log.Info("Start to restore...") + + rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(ctx, rep, snapshotID, false) + if err != nil { + return 0, 0, errors.Wrapf(err, "Unable to get filesystem entry for snapshot %v", snapshotID) + } + + path, err := filepath.Abs(dest) + if err != nil { + return 0, 0, errors.Wrapf(err, "Unable to resolve path %v", dest) + } + + output := &restore.FilesystemOutput{ + TargetPath: path, + OverwriteDirectories: true, + OverwriteFiles: true, + OverwriteSymlinks: true, + IgnorePermissionErrors: true, + } + + stat, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{ + Parallel: runtime.NumCPU(), + RestoreDirEntryAtDepth: math.MaxInt32, + Cancel: cancleCh, + ProgressCallback: func(ctx context.Context, stats restore.Stats) { + progress.ProgressBytes(stats.RestoredTotalFileSize, stats.EnqueuedTotalFileSize) + }, + }) + + if err != nil { + return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target") + } + return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil +} diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go new file mode 100644 index 000000000..3a60658c9 --- /dev/null +++ b/pkg/uploader/kopia/snapshot_test.go @@ -0,0 +1,199 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import ( + "context" + "testing" + + "github.com/kopia/kopia/snapshot" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks" + "github.com/vmware-tanzu/velero/pkg/uploader" + uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks" +) + +type snapshotMockes struct { + policyMock *uploadermocks.Policy + snapshotMock *uploadermocks.Snapshot + uploderMock *uploadermocks.Uploader + repoWriterMock *repomocks.RepositoryWriter +} + +type mockArgs struct { + methodName string + returns []interface{} +} + +func InjectSnapshotFuncs() *snapshotMockes { + s := &snapshotMockes{ + policyMock: &uploadermocks.Policy{}, + snapshotMock: &uploadermocks.Snapshot{}, + uploderMock: &uploadermocks.Uploader{}, + repoWriterMock: &repomocks.RepositoryWriter{}, + } + + setPolicyFunc = s.policyMock.SetPolicy + treeForSourceFunc = s.policyMock.TreeForSource + applyRetentionPolicyFunc = s.policyMock.ApplyRetentionPolicy + loadSnapshotFunc = s.snapshotMock.LoadSnapshot + saveSnapshotFunc = s.snapshotMock.SaveSnapshot + return s +} + +func MockFuncs(s *snapshotMockes, args []mockArgs) { + s.snapshotMock.On("LoadSnapshot", mock.Anything, mock.Anything, mock.Anything).Return(args[0].returns...) + s.snapshotMock.On("SaveSnapshot", mock.Anything, mock.Anything, mock.Anything).Return(args[1].returns...) + s.policyMock.On("TreeForSource", mock.Anything, mock.Anything, mock.Anything).Return(args[2].returns...) + s.policyMock.On("ApplyRetentionPolicy", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[3].returns...) + s.policyMock.On("SetPolicy", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[4].returns...) + s.uploderMock.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[5].returns...) + s.repoWriterMock.On("Flush", mock.Anything).Return(args[6].returns...) +} + +func TestSnapshotSource(t *testing.T) { + + ctx := context.TODO() + sourceInfo := snapshot.SourceInfo{ + UserName: "testUserName", + Host: "testHost", + Path: "/var", + } + rootDir, err := getLocalFSEntry(sourceInfo.Path) + assert.NoError(t, err) + log := logrus.New() + manifest := &snapshot.Manifest{ + ID: "test", + RootEntry: &snapshot.DirEntry{}, + } + + testCases := []struct { + name string + args []mockArgs + notError bool + }{ + { + name: "regular test", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "Upload", returns: []interface{}{manifest, nil}}, + {methodName: "Flush", returns: []interface{}{nil}}, + }, + notError: true, + }, + { + name: "failed to load snapshot", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, errors.New("failed to load snapshot")}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "Upload", returns: []interface{}{manifest, nil}}, + {methodName: "Flush", returns: []interface{}{nil}}, + }, + notError: false, + }, + { + name: "failed to save snapshot", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, errors.New("failed to save snapshot")}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "Upload", returns: []interface{}{manifest, nil}}, + {methodName: "Flush", returns: []interface{}{nil}}, + }, + notError: false, + }, + { + name: "failed to apply policy", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, errors.New("failed to save snapshot")}}, + {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "Upload", returns: []interface{}{manifest, nil}}, + {methodName: "Flush", returns: []interface{}{nil}}, + }, + notError: false, + }, + { + name: "failed to set policy", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{errors.New("failed to set policy")}}, + {methodName: "Upload", returns: []interface{}{manifest, nil}}, + {methodName: "Flush", returns: []interface{}{nil}}, + }, + notError: false, + }, + { + name: "failed to upload snapshot", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "Upload", returns: []interface{}{manifest, errors.New("failed to upload snapshot")}}, + {methodName: "Flush", returns: []interface{}{nil}}, + }, + notError: false, + }, + { + name: "failed to flush repo", + args: []mockArgs{ + {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, + {methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, errors.New("failed to save snapshot")}}, + {methodName: "TreeForSource", returns: []interface{}{nil, nil}}, + {methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}}, + {methodName: "SetPolicy", returns: []interface{}{nil}}, + {methodName: "Upload", returns: []interface{}{manifest, nil}}, + {methodName: "Flush", returns: []interface{}{errors.New("failed to flush repo")}}, + }, + notError: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + s := InjectSnapshotFuncs() + MockFuncs(s, tc.args) + _, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource", func(up uploader.UploaderProgress) {}) + if tc.notError { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } + +} diff --git a/pkg/uploader/mocks/policy.go b/pkg/uploader/mocks/policy.go new file mode 100644 index 000000000..3c1dcdd78 --- /dev/null +++ b/pkg/uploader/mocks/policy.go @@ -0,0 +1,92 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + "context" + + "github.com/kopia/kopia/snapshot/policy" + "github.com/stretchr/testify/mock" + + "github.com/kopia/kopia/repo" + + "github.com/kopia/kopia/snapshot" +) + +// policy is an autogenerated mock type for the TreeForSource type +type Policy struct { + mock.Mock +} + +// Execute provides a mock function with given fields: ctx, rep, si +func (_m *Policy) TreeForSource(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) (*policy.Tree, error) { + ret := _m.Called(ctx, rep, si) + + var r0 *policy.Tree + if rf, ok := ret.Get(0).(func(context.Context, repo.Repository, snapshot.SourceInfo) *policy.Tree); ok { + r0 = rf(ctx, rep, si) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*policy.Tree) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, repo.Repository, snapshot.SourceInfo) error); ok { + r1 = rf(ctx, rep, si) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ApplyRetentionPolicy provides a mock function with given fields: ctx, rep, sourceInfo, reallyDelete +func (_m *Policy) ApplyRetentionPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo, reallyDelete bool) ([]*snapshot.Manifest, error) { + ret := _m.Called(ctx, rep, sourceInfo, reallyDelete) + + var r0 []*snapshot.Manifest + if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, bool) []*snapshot.Manifest); ok { + r0 = rf(ctx, rep, sourceInfo, reallyDelete) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*snapshot.Manifest) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, bool) error); ok { + r1 = rf(ctx, rep, sourceInfo, reallyDelete) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +func (_m *Policy) SetPolicy(ctx context.Context, rep repo.RepositoryWriter, si snapshot.SourceInfo, pol *policy.Policy) error { + ret := _m.Called(ctx, rep, si, pol) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, *policy.Policy) error); ok { + r0 = rf(ctx, rep, si, pol) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/uploader/mocks/shim.go b/pkg/uploader/mocks/shim.go new file mode 100644 index 000000000..1ec3acc58 --- /dev/null +++ b/pkg/uploader/mocks/shim.go @@ -0,0 +1,42 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// shimRepository is an autogenerated mock type for the shimRepository type +type ShimRepository struct { + mock.Mock +} + +// Flush provides a mock function with given fields: ctx +func (_m *ShimRepository) Flush(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/uploader/mocks/snapshot.go b/pkg/uploader/mocks/snapshot.go new file mode 100644 index 000000000..c651242eb --- /dev/null +++ b/pkg/uploader/mocks/snapshot.go @@ -0,0 +1,76 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + "context" + + "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/snapshot" + "github.com/stretchr/testify/mock" + + "github.com/kopia/kopia/repo" +) + +// snapshot is an autogenerated mock type for the snapshot type +type Snapshot struct { + mock.Mock +} + +// LoadSnapshot provides a mock function with given fields: ctx, rep, manifestID +func (_m *Snapshot) LoadSnapshot(ctx context.Context, rep repo.Repository, manifestID manifest.ID) (*snapshot.Manifest, error) { + ret := _m.Called(ctx, rep, manifestID) + + var r0 *snapshot.Manifest + if rf, ok := ret.Get(0).(func(context.Context, repo.Repository, manifest.ID) *snapshot.Manifest); ok { + r0 = rf(ctx, rep, manifestID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*snapshot.Manifest) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, repo.Repository, manifest.ID) error); ok { + r1 = rf(ctx, rep, manifestID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SaveSnapshot provides a mock function with given fields: ctx, rep, man +func (_m *Snapshot) SaveSnapshot(ctx context.Context, rep repo.RepositoryWriter, man *snapshot.Manifest) (manifest.ID, error) { + ret := _m.Called(ctx, rep, man) + + var r0 manifest.ID + if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, *snapshot.Manifest) manifest.ID); ok { + r0 = rf(ctx, rep, man) + } else { + r0 = ret.Get(0).(manifest.ID) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, repo.RepositoryWriter, *snapshot.Manifest) error); ok { + r1 = rf(ctx, rep, man) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/uploader/mocks/uploader.go b/pkg/uploader/mocks/uploader.go new file mode 100644 index 000000000..d8b5fa2fd --- /dev/null +++ b/pkg/uploader/mocks/uploader.go @@ -0,0 +1,63 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + "context" + + "github.com/kopia/kopia/fs" + "github.com/stretchr/testify/mock" + + "github.com/kopia/kopia/snapshot/policy" + + "github.com/kopia/kopia/snapshot" +) + +// Upload is an autogenerated mock type for the Upload type +type Uploader struct { + mock.Mock +} + +// Execute provides a mock function with given fields: ctx, source, policyTree, sourceInfo, previousManifests +func (_m *Uploader) Upload(ctx context.Context, source fs.Entry, policyTree *policy.Tree, sourceInfo snapshot.SourceInfo, previousManifests ...*snapshot.Manifest) (*snapshot.Manifest, error) { + _va := make([]interface{}, len(previousManifests)) + for _i := range previousManifests { + _va[_i] = previousManifests[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, source, policyTree, sourceInfo) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *snapshot.Manifest + if rf, ok := ret.Get(0).(func(context.Context, fs.Entry, *policy.Tree, snapshot.SourceInfo, ...*snapshot.Manifest) *snapshot.Manifest); ok { + r0 = rf(ctx, source, policyTree, sourceInfo, previousManifests...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*snapshot.Manifest) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, fs.Entry, *policy.Tree, snapshot.SourceInfo, ...*snapshot.Manifest) error); ok { + r1 = rf(ctx, source, policyTree, sourceInfo, previousManifests...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go new file mode 100644 index 000000000..6bdd1a446 --- /dev/null +++ b/pkg/uploader/provider/kopia.go @@ -0,0 +1,204 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/vmware-tanzu/velero/pkg/uploader/kopia" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + repokeys "github.com/vmware-tanzu/velero/pkg/repository/keys" + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service" +) + +//BackupFunc mainly used to make testing more convenient +var BackupFunc = kopia.Backup +var RestoreFunc = kopia.Restore + +//kopiaProvider recorded info related with kopiaProvider +//action which means provider handle backup or restore +type kopiaProvider struct { + bkRepo udmrepo.BackupRepo + credGetter *credentials.CredentialGetter + uploader *snapshotfs.Uploader + restoreCancel chan struct{} + log logrus.FieldLogger +} + +//NewKopiaUploaderProvider initialized with open or create a repository +func NewKopiaUploaderProvider( + ctx context.Context, + credGetter *credentials.CredentialGetter, + bsl *velerov1api.BackupStorageLocation, + log logrus.FieldLogger, +) (Provider, error) { + kp := &kopiaProvider{ + log: log, + credGetter: credGetter, + } + //repoUID which is used to generate kopia repository config with unique directory path + repoUID := string(bsl.GetUID()) + repoOpt, err := udmrepo.NewRepoOptions( + udmrepo.WithPassword(kp, ""), + udmrepo.WithConfigFile("", repoUID), + udmrepo.WithDescription("Initial kopia uploader provider"), + ) + if err != nil { + return nil, errors.Wrapf(err, "error to get repo options") + } + + repoSvc := service.Create(log) + log.WithField("repoUID", repoUID).Info("Opening backup repo") + + kp.bkRepo, err = repoSvc.Open(ctx, *repoOpt) + if err != nil { + return nil, errors.Wrapf(err, "Failed to find kopia repository") + } + return kp, nil +} + +//CheckContext check context status periodically +//check if context is timeout or cancel +func (kp *kopiaProvider) CheckContext(ctx context.Context) { + for { + select { + case <-ctx.Done(): + if kp.uploader != nil { + kp.uploader.Cancel() + kp.log.Infof("Backup is been canceled") + } + if kp.restoreCancel != nil { + close(kp.restoreCancel) + kp.log.Infof("Restore is been canceled") + } + return + default: + time.Sleep(time.Second * 10) + } + } +} + +func (kp *kopiaProvider) Close(ctx context.Context) { + kp.bkRepo.Close(ctx) +} + +//RunBackup which will backup specific path and update backup progress in pvb status +func (kp *kopiaProvider) RunBackup( + ctx context.Context, + path string, + tags map[string]string, + parentSnapshot string, + updater velerov1api.ProgressUpdater) (string, error) { + if updater == nil { + return "", errors.New("Need to inital backup progress updater first") + } + + log := kp.log.WithFields(logrus.Fields{ + "path": path, + "parentSnapshot": parentSnapshot, + }) + repoWriter := kopia.NewShimRepo(kp.bkRepo) + kp.uploader = snapshotfs.NewUploader(repoWriter) + prorgess := new(kopia.KopiaProgress) + prorgess.InitThrottle(backupProgressCheckInterval) + prorgess.Updater = updater + kp.uploader.Progress = prorgess + + log.Info("Starting backup") + go kp.CheckContext(ctx) + + snapshotInfo, err := BackupFunc(ctx, kp.uploader, repoWriter, path, parentSnapshot, log) + + if err != nil { + return "", errors.Wrapf(err, "Failed to run kopia backup") + } else if snapshotInfo == nil { + return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path) + } + + updater.UpdateProgress( + &velerov1api.UploaderProgress{ + TotalBytes: snapshotInfo.Size, + BytesDone: snapshotInfo.Size, + }, + ) + + log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size) + return snapshotInfo.ID, nil +} + +func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) { + if kp.credGetter.FromSecret == nil { + return "", errors.New("invalid credentials interface") + } + rawPass, err := kp.credGetter.FromSecret.Get(repokeys.RepoKeySelector()) + if err != nil { + return "", errors.Wrap(err, "error to get password") + } + + return strings.TrimSpace(rawPass), nil +} + +//RunRestore which will restore specific path and update restore progress in pvr status +func (kp *kopiaProvider) RunRestore( + ctx context.Context, + snapshotID string, + volumePath string, + updater velerov1api.ProgressUpdater) error { + log := kp.log.WithFields(logrus.Fields{ + "snapshotID": snapshotID, + "volumePath": volumePath, + }) + repoWriter := kopia.NewShimRepo(kp.bkRepo) + prorgess := new(kopia.KopiaProgress) + prorgess.InitThrottle(restoreProgressCheckInterval) + prorgess.Updater = updater + kp.restoreCancel = make(chan struct{}) + defer func() { + if kp.restoreCancel != nil { + close(kp.restoreCancel) + } + }() + + log.Info("Starting restore") + go kp.CheckContext(ctx) + size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, kp.restoreCancel) + + if err != nil { + return errors.Wrapf(err, "Failed to run kopia restore") + } + + updater.UpdateProgress(&velerov1api.UploaderProgress{ + TotalBytes: size, + BytesDone: size, + }) + + output := fmt.Sprintf("Kopia restore finished, restore size %d, file count %d", size, fileCount) + + log.Info(output) + + return nil +} diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go new file mode 100644 index 000000000..3656053d0 --- /dev/null +++ b/pkg/uploader/provider/kopia_test.go @@ -0,0 +1,118 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "testing" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/uploader/kopia" +) + +func TestRunBackup(t *testing.T) { + var kp kopiaProvider + kp.log = logrus.New() + fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme) + updater := velerov1api.NewBackupProgressUpdater(&velerov1api.PodVolumeBackup{}, kp.log, context.Background(), fakeClient) + testCases := []struct { + name string + hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) + notError bool + }{ + { + name: "success to backup", + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { + return &uploader.SnapshotInfo{}, nil + }, + notError: true, + }, + { + name: "get error to backup", + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { + return &uploader.SnapshotInfo{}, errors.New("failed to backup") + }, + notError: false, + }, + { + name: "got empty snapshot", + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) { + return nil, nil + }, + notError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + BackupFunc = tc.hookBackupFunc + _, err := kp.RunBackup(context.Background(), "var", nil, "", updater) + if tc.notError { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } +} + +func TestRunRestore(t *testing.T) { + var kp kopiaProvider + kp.log = logrus.New() + updater := velerov1api.NewRestoreProgressUpdater(&velerov1api.PodVolumeRestore{}, kp.log, context.Background(), fake.NewFakeClientWithScheme(scheme.Scheme)) + + testCases := []struct { + name string + hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) + notError bool + }{ + { + name: "normal restore", + hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + return 0, 0, nil + }, + notError: true, + }, + { + name: "failed to restore", + hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + return 0, 0, errors.New("failed to restore") + }, + notError: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + RestoreFunc = tc.hookRestoreFunc + err := kp.RunRestore(context.Background(), "", "/var", updater) + if tc.notError { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } +} diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index 5a90a806f..c6fe0a177 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -18,27 +18,53 @@ package provider import ( "context" + "time" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + + "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/uploader" ) +const restoreProgressCheckInterval = 10 * time.Second +const backupProgressCheckInterval = 10 * time.Second + // Provider which is designed for one pod volumn to do the backup or restore type Provider interface { // RunBackup which will do backup for one specific volumn and return snapshotID error - // updateFunc which is used for update backup progress into related pvb status + // updater which is used for update backup progress into related pvb status RunBackup( ctx context.Context, path string, tags map[string]string, parentSnapshot string, - updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, error) + updater velerov1api.ProgressUpdater) (string, error) // RunRestore which will do restore for one specific volumn with given snapshot id and return error // updateFunc which is used for update restore progress into related pvr status RunRestore( ctx context.Context, snapshotID string, volumePath string, - updateFunc func(velerov1api.PodVolumeOperationProgress)) error + updater velerov1api.ProgressUpdater) error // Close which will close related repository Close(ctx context.Context) } + +//NewUploaderProvider initialize provider with specific uploader_type +func NewUploaderProvider( + ctx context.Context, + uploader_type string, + repoIdentifier string, + bsl *velerov1api.BackupStorageLocation, + credGetter *credentials.CredentialGetter, + repoKeySelector *v1.SecretKeySelector, + log logrus.FieldLogger, +) (Provider, error) { + if uploader_type == uploader.KopiaType { + return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) + } else { + return NewKopiaUploaderProvider(ctx, credGetter, bsl, log) + } +} diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go new file mode 100644 index 000000000..f908077a3 --- /dev/null +++ b/pkg/uploader/provider/restic.go @@ -0,0 +1,34 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package provider + +import ( + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" +) + +func NewResticUploaderProvider( + repoIdentifier string, + bsl *velerov1api.BackupStorageLocation, + credGetter *credentials.CredentialGetter, + repoKeySelector *v1.SecretKeySelector, + log logrus.FieldLogger, +) (Provider, error) { + return nil, nil //TODO +} diff --git a/pkg/uploader/types.go b/pkg/uploader/types.go index 134e36cce..4f5274a97 100644 --- a/pkg/uploader/types.go +++ b/pkg/uploader/types.go @@ -22,10 +22,8 @@ import ( ) const ( - ResticType = "restic" - KopiaType = "kopia" - VeleroBackup = "backup" - VeleroRestore = "restore" + ResticType = "restic" + KopiaType = "kopia" ) // ValidateUploaderType validates if the input param is a valid uploader type. @@ -42,8 +40,3 @@ type SnapshotInfo struct { ID string `json:"id"` Size int64 `json:"Size"` } - -type UploaderProgress struct { - TotalBytes int64 `json:"totalBytes,omitempty"` - BytesDone int64 `json:"doneBytes,omitempty"` -}