mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-04-27 02:55:09 +00:00
Merge pull request #5221 from qiuming-best/uploader-kopia
Uploader Implementation: Kopia backup and restore
This commit is contained in:
1
changelogs/unreleased/5221-qiuming-best
Normal file
1
changelogs/unreleased/5221-qiuming-best
Normal file
@@ -0,0 +1 @@
|
||||
Uploader Implementation: Kopia backup and restore
|
||||
1
go.sum
1
go.sum
@@ -552,6 +552,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0 h1:nHHjmvjitIiyPlUHk/ofpgvBcNcawJLtf4PYHORLjAA=
|
||||
github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0/go.mod h1:YBCo4DoEeDndqvAn6eeu0vWM7QdXmHEeI9cFWplmBys=
|
||||
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
|
||||
|
||||
@@ -486,7 +486,7 @@ func (v *volumesByPod) Add(namespace, name, volume, phase string, progress veler
|
||||
key := fmt.Sprintf("%s/%s", namespace, name)
|
||||
|
||||
// append backup progress percentage if backup is in progress
|
||||
if phase == "In Progress" && progress != (velerov1api.PodVolumeOperationProgress{}) {
|
||||
if phase == "In Progress" && progress.TotalBytes != 0 {
|
||||
volume = fmt.Sprintf("%s (%.2f%%)", volume, float64(progress.BytesDone)/float64(progress.TotalBytes)*100)
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
@@ -61,6 +62,13 @@ type PodVolumeBackupReconciler struct {
|
||||
Log logrus.FieldLogger
|
||||
}
|
||||
|
||||
type BackupProgressUpdater struct {
|
||||
PodVolumeBackup *velerov1api.PodVolumeBackup
|
||||
Log logrus.FieldLogger
|
||||
Ctx context.Context
|
||||
Cli client.Client
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups/status,verbs=get;update;patch
|
||||
func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
@@ -364,3 +372,20 @@ func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log
|
||||
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger, ctx context.Context) *BackupProgressUpdater {
|
||||
return &BackupProgressUpdater{pvb, log, ctx, r.Client}
|
||||
}
|
||||
|
||||
//UpdateProgress which implement ProgressUpdater interface to update pvb progress status
|
||||
func (b *BackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {
|
||||
original := b.PodVolumeBackup.DeepCopy()
|
||||
b.PodVolumeBackup.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if b.Cli == nil {
|
||||
b.Log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume)
|
||||
return
|
||||
}
|
||||
if err := b.Cli.Patch(b.Ctx, b.PodVolumeBackup, client.MergeFrom(original)); err != nil {
|
||||
b.Log.Errorf("update backup pod %s volume %s progress with %v", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ import (
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/restic"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
|
||||
"github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
@@ -64,6 +65,13 @@ type PodVolumeRestoreReconciler struct {
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
type RestoreProgressUpdater struct {
|
||||
PodVolumeRestore *velerov1api.PodVolumeRestore
|
||||
Log logrus.FieldLogger
|
||||
Ctx context.Context
|
||||
Cli client.Client
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups="",resources=pods,verbs=get
|
||||
@@ -329,3 +337,20 @@ func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger, ctx context.Context) *RestoreProgressUpdater {
|
||||
return &RestoreProgressUpdater{pvr, log, ctx, r.Client}
|
||||
}
|
||||
|
||||
//UpdateProgress which implement ProgressUpdater interface to update pvr progress status
|
||||
func (r *RestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {
|
||||
original := r.PodVolumeRestore.DeepCopy()
|
||||
r.PodVolumeRestore.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if r.Cli == nil {
|
||||
r.Log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.PodVolumeRestore.Spec.Pod.Name, r.PodVolumeRestore.Spec.Volume)
|
||||
return
|
||||
}
|
||||
if err := r.Cli.Patch(r.Ctx, r.PodVolumeRestore, client.MergeFrom(original)); err != nil {
|
||||
r.Log.Errorf("update restore pod %s volume %s progress with %v", r.PodVolumeRestore.Spec.Pod.Name, r.PodVolumeRestore.Spec.Volume, err)
|
||||
}
|
||||
}
|
||||
|
||||
349
pkg/repository/mocks/repository_writer.go
Normal file
349
pkg/repository/mocks/repository_writer.go
Normal file
@@ -0,0 +1,349 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
index "github.com/kopia/kopia/repo/content/index"
|
||||
manifest "github.com/kopia/kopia/repo/manifest"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
object "github.com/kopia/kopia/repo/object"
|
||||
|
||||
repo "github.com/kopia/kopia/repo"
|
||||
|
||||
time "time"
|
||||
)
|
||||
|
||||
// RepositoryWriter is an autogenerated mock type for the RepositoryWriter type
|
||||
type RepositoryWriter struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// ClientOptions provides a mock function with given fields:
|
||||
func (_m *RepositoryWriter) ClientOptions() repo.ClientOptions {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 repo.ClientOptions
|
||||
if rf, ok := ret.Get(0).(func() repo.ClientOptions); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(repo.ClientOptions)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields: ctx
|
||||
func (_m *RepositoryWriter) Close(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// ContentInfo provides a mock function with given fields: ctx, contentID
|
||||
func (_m *RepositoryWriter) ContentInfo(ctx context.Context, contentID index.ID) (index.Info, error) {
|
||||
ret := _m.Called(ctx, contentID)
|
||||
|
||||
var r0 index.Info
|
||||
if rf, ok := ret.Get(0).(func(context.Context, index.ID) index.Info); ok {
|
||||
r0 = rf(ctx, contentID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(index.Info)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, index.ID) error); ok {
|
||||
r1 = rf(ctx, contentID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// DeleteManifest provides a mock function with given fields: ctx, id
|
||||
func (_m *RepositoryWriter) DeleteManifest(ctx context.Context, id manifest.ID) error {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID) error); ok {
|
||||
r0 = rf(ctx, id)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// FindManifests provides a mock function with given fields: ctx, labels
|
||||
func (_m *RepositoryWriter) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) {
|
||||
ret := _m.Called(ctx, labels)
|
||||
|
||||
var r0 []*manifest.EntryMetadata
|
||||
if rf, ok := ret.Get(0).(func(context.Context, map[string]string) []*manifest.EntryMetadata); ok {
|
||||
r0 = rf(ctx, labels)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*manifest.EntryMetadata)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, map[string]string) error); ok {
|
||||
r1 = rf(ctx, labels)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Flush provides a mock function with given fields: ctx
|
||||
func (_m *RepositoryWriter) Flush(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetManifest provides a mock function with given fields: ctx, id, data
|
||||
func (_m *RepositoryWriter) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) {
|
||||
ret := _m.Called(ctx, id, data)
|
||||
|
||||
var r0 *manifest.EntryMetadata
|
||||
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID, interface{}) *manifest.EntryMetadata); ok {
|
||||
r0 = rf(ctx, id, data)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*manifest.EntryMetadata)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, manifest.ID, interface{}) error); ok {
|
||||
r1 = rf(ctx, id, data)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// NewObjectWriter provides a mock function with given fields: ctx, opt
|
||||
func (_m *RepositoryWriter) NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer {
|
||||
ret := _m.Called(ctx, opt)
|
||||
|
||||
var r0 object.Writer
|
||||
if rf, ok := ret.Get(0).(func(context.Context, object.WriterOptions) object.Writer); ok {
|
||||
r0 = rf(ctx, opt)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(object.Writer)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// NewWriter provides a mock function with given fields: ctx, opt
|
||||
func (_m *RepositoryWriter) NewWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.RepositoryWriter, error) {
|
||||
ret := _m.Called(ctx, opt)
|
||||
|
||||
var r0 context.Context
|
||||
if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok {
|
||||
r0 = rf(ctx, opt)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(context.Context)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 repo.RepositoryWriter
|
||||
if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.RepositoryWriter); ok {
|
||||
r1 = rf(ctx, opt)
|
||||
} else {
|
||||
if ret.Get(1) != nil {
|
||||
r1 = ret.Get(1).(repo.RepositoryWriter)
|
||||
}
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok {
|
||||
r2 = rf(ctx, opt)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// OpenObject provides a mock function with given fields: ctx, id
|
||||
func (_m *RepositoryWriter) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 object.Reader
|
||||
if rf, ok := ret.Get(0).(func(context.Context, object.ID) object.Reader); ok {
|
||||
r0 = rf(ctx, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(object.Reader)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
|
||||
r1 = rf(ctx, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// PrefetchContents provides a mock function with given fields: ctx, contentIDs, hint
|
||||
func (_m *RepositoryWriter) PrefetchContents(ctx context.Context, contentIDs []index.ID, hint string) []index.ID {
|
||||
ret := _m.Called(ctx, contentIDs, hint)
|
||||
|
||||
var r0 []index.ID
|
||||
if rf, ok := ret.Get(0).(func(context.Context, []index.ID, string) []index.ID); ok {
|
||||
r0 = rf(ctx, contentIDs, hint)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]index.ID)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// PrefetchObjects provides a mock function with given fields: ctx, objectIDs, hint
|
||||
func (_m *RepositoryWriter) PrefetchObjects(ctx context.Context, objectIDs []object.ID, hint string) ([]index.ID, error) {
|
||||
ret := _m.Called(ctx, objectIDs, hint)
|
||||
|
||||
var r0 []index.ID
|
||||
if rf, ok := ret.Get(0).(func(context.Context, []object.ID, string) []index.ID); ok {
|
||||
r0 = rf(ctx, objectIDs, hint)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]index.ID)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, []object.ID, string) error); ok {
|
||||
r1 = rf(ctx, objectIDs, hint)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// PutManifest provides a mock function with given fields: ctx, labels, payload
|
||||
func (_m *RepositoryWriter) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
|
||||
ret := _m.Called(ctx, labels, payload)
|
||||
|
||||
var r0 manifest.ID
|
||||
if rf, ok := ret.Get(0).(func(context.Context, map[string]string, interface{}) manifest.ID); ok {
|
||||
r0 = rf(ctx, labels, payload)
|
||||
} else {
|
||||
r0 = ret.Get(0).(manifest.ID)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, map[string]string, interface{}) error); ok {
|
||||
r1 = rf(ctx, labels, payload)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Refresh provides a mock function with given fields: ctx
|
||||
func (_m *RepositoryWriter) Refresh(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Time provides a mock function with given fields:
|
||||
func (_m *RepositoryWriter) Time() time.Time {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 time.Time
|
||||
if rf, ok := ret.Get(0).(func() time.Time); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(time.Time)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// UpdateDescription provides a mock function with given fields: d
|
||||
func (_m *RepositoryWriter) UpdateDescription(d string) {
|
||||
_m.Called(d)
|
||||
}
|
||||
|
||||
// VerifyObject provides a mock function with given fields: ctx, id
|
||||
func (_m *RepositoryWriter) VerifyObject(ctx context.Context, id object.ID) ([]index.ID, error) {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 []index.ID
|
||||
if rf, ok := ret.Get(0).(func(context.Context, object.ID) []index.ID); ok {
|
||||
r0 = rf(ctx, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]index.ID)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
|
||||
r1 = rf(ctx, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
@@ -60,9 +60,9 @@ type KopiaProgress struct {
|
||||
estimatedFileCount int32 // +checklocksignore the total count of files to be processed
|
||||
estimatedTotalBytes int64 // +checklocksignore the total size of files to be processed
|
||||
// +checkatomic
|
||||
processedBytes int64 // which statistic all bytes has been processed currently
|
||||
outputThrottle Throttle // which control the frequency of update progress
|
||||
UpFunc func(uploader.UploaderProgress) //which called by UpdateProgress func, it is used to update pvb or pvr status
|
||||
processedBytes int64 // which statistic all bytes has been processed currently
|
||||
outputThrottle Throttle // which control the frequency of update progress
|
||||
Updater uploader.ProgressUpdater //which kopia progress will call the UpdateProgress interface, the third party will implement the interface to do the progress update
|
||||
}
|
||||
|
||||
//UploadedBytes the total bytes has uploaded currently
|
||||
@@ -90,13 +90,10 @@ func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
|
||||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
//UpdateProgress which called by UpdateProgress func, it is used to update pvb or pvr status
|
||||
//UpdateProgress which calls Updater UpdateProgress interface, update progress by third-party implementation
|
||||
func (p *KopiaProgress) UpdateProgress() {
|
||||
if p.outputThrottle.ShouldOutput() {
|
||||
p.UpFunc(uploader.UploaderProgress{
|
||||
TotalBytes: atomic.LoadInt64(&p.estimatedTotalBytes),
|
||||
BytesDone: atomic.LoadInt64(&p.processedBytes),
|
||||
})
|
||||
p.Updater.UpdateProgress(&uploader.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
287
pkg/uploader/kopia/snapshot.go
Normal file
287
pkg/uploader/kopia/snapshot.go
Normal file
@@ -0,0 +1,287 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/fs/localfs"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/kopia/kopia/snapshot/restore"
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
//All function mainly used to make testing more convenient
|
||||
var treeForSourceFunc = policy.TreeForSource
|
||||
var applyRetentionPolicyFunc = policy.ApplyRetentionPolicy
|
||||
var setPolicyFunc = policy.SetPolicy
|
||||
var saveSnapshotFunc = snapshot.SaveSnapshot
|
||||
var loadSnapshotFunc = snapshot.LoadSnapshot
|
||||
|
||||
//SnapshotUploader which mainly used for UT test that could overwrite Upload interface
|
||||
type SnapshotUploader interface {
|
||||
Upload(
|
||||
ctx context.Context,
|
||||
source fs.Entry,
|
||||
policyTree *policy.Tree,
|
||||
sourceInfo snapshot.SourceInfo,
|
||||
previousManifests ...*snapshot.Manifest,
|
||||
) (*snapshot.Manifest, error)
|
||||
}
|
||||
|
||||
func newOptionalInt(b policy.OptionalInt) *policy.OptionalInt {
|
||||
return &b
|
||||
}
|
||||
|
||||
//setupDefaultPolicy set default policy for kopia
|
||||
func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo) error {
|
||||
return setPolicyFunc(ctx, rep, sourceInfo, &policy.Policy{
|
||||
RetentionPolicy: policy.RetentionPolicy{
|
||||
KeepLatest: newOptionalInt(math.MaxInt32),
|
||||
},
|
||||
CompressionPolicy: policy.CompressionPolicy{
|
||||
CompressorName: "none",
|
||||
},
|
||||
UploadPolicy: policy.UploadPolicy{
|
||||
MaxParallelFileReads: newOptionalInt(policy.OptionalInt(runtime.NumCPU())),
|
||||
},
|
||||
SchedulingPolicy: policy.SchedulingPolicy{
|
||||
Manual: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
//Backup backup specific sourcePath and update progress
|
||||
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
|
||||
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
if fsUploader == nil {
|
||||
return nil, errors.New("get empty kopia uploader")
|
||||
}
|
||||
dir, err := filepath.Abs(sourcePath)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
|
||||
}
|
||||
|
||||
sourceInfo := snapshot.SourceInfo{
|
||||
UserName: udmrepo.GetRepoUser(),
|
||||
Host: udmrepo.GetRepoDomain(),
|
||||
Path: filepath.Clean(dir),
|
||||
}
|
||||
|
||||
rootDir, err := getLocalFSEntry(sourceInfo.Path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Unable to get local filesystem entry")
|
||||
}
|
||||
snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
snapshotInfo := &uploader.SnapshotInfo{
|
||||
ID: snapID,
|
||||
Size: snapshotSize,
|
||||
}
|
||||
|
||||
return snapshotInfo, nil
|
||||
}
|
||||
|
||||
func getLocalFSEntry(path0 string) (fs.Entry, error) {
|
||||
path, err := resolveSymlink(path0)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "resolveSymlink")
|
||||
}
|
||||
|
||||
e, err := localfs.NewEntry(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't get local fs entry")
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
//resolveSymlink returns the path name after the evaluation of any symbolic links
|
||||
func resolveSymlink(path string) (string, error) {
|
||||
st, err := os.Lstat(path)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "stat")
|
||||
}
|
||||
|
||||
if (st.Mode() & os.ModeSymlink) == 0 {
|
||||
return path, nil
|
||||
}
|
||||
|
||||
return filepath.EvalSymlinks(path)
|
||||
}
|
||||
|
||||
//SnapshotSource which setup policy for snapshot, upload snapshot, update progress
|
||||
func SnapshotSource(
|
||||
ctx context.Context,
|
||||
rep repo.RepositoryWriter,
|
||||
u SnapshotUploader,
|
||||
sourceInfo snapshot.SourceInfo,
|
||||
rootDir fs.Entry,
|
||||
parentSnapshot string,
|
||||
log logrus.FieldLogger,
|
||||
description string,
|
||||
) (string, int64, error) {
|
||||
log.Info("Start to snapshot...")
|
||||
snapshotStartTime := time.Now()
|
||||
|
||||
var previous []*snapshot.Manifest
|
||||
if parentSnapshot != "" {
|
||||
mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot))
|
||||
if err != nil {
|
||||
return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot)
|
||||
}
|
||||
|
||||
previous = append(previous, mani)
|
||||
} else {
|
||||
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil)
|
||||
if err != nil {
|
||||
return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo)
|
||||
}
|
||||
|
||||
previous = pre
|
||||
}
|
||||
var manifest *snapshot.Manifest
|
||||
if err := setupDefaultPolicy(ctx, rep, sourceInfo); err != nil {
|
||||
return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo)
|
||||
}
|
||||
|
||||
policyTree, err := treeForSourceFunc(ctx, rep, sourceInfo)
|
||||
if err != nil {
|
||||
return "", 0, errors.Wrapf(err, "unable to create policy getter for si %v", sourceInfo)
|
||||
}
|
||||
|
||||
manifest, err = u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...)
|
||||
if err != nil {
|
||||
return "", 0, errors.Wrapf(err, "Failed to upload the kopia snapshot for si %v", sourceInfo)
|
||||
}
|
||||
|
||||
manifest.Description = description
|
||||
|
||||
if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil {
|
||||
return "", 0, errors.Wrapf(err, "Failed to save kopia manifest %v", manifest.ID)
|
||||
}
|
||||
_, err = applyRetentionPolicyFunc(ctx, rep, sourceInfo, true)
|
||||
if err != nil {
|
||||
return "", 0, errors.Wrapf(err, "Failed to apply kopia retention policy for si %v", sourceInfo)
|
||||
}
|
||||
if err = rep.Flush(ctx); err != nil {
|
||||
return "", 0, errors.Wrapf(err, "Failed to flush kopia repository")
|
||||
}
|
||||
log.Infof("Created snapshot with root %v and ID %v in %v", manifest.RootObjectID(), manifest.ID, time.Since(snapshotStartTime).Truncate(time.Second))
|
||||
return reportSnapshotStatus(manifest)
|
||||
}
|
||||
|
||||
func reportSnapshotStatus(manifest *snapshot.Manifest) (string, int64, error) {
|
||||
manifestID := manifest.ID
|
||||
snapSize := manifest.Stats.TotalFileSize
|
||||
|
||||
var errs []string
|
||||
if ds := manifest.RootEntry.DirSummary; ds != nil {
|
||||
for _, ent := range ds.FailedEntries {
|
||||
errs = append(errs, ent.Error)
|
||||
}
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return "", 0, errors.New(strings.Join(errs, "\n"))
|
||||
}
|
||||
|
||||
return string(manifestID), snapSize, nil
|
||||
}
|
||||
|
||||
// findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including
|
||||
// last complete snapshot following it.
|
||||
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, noLaterThan *time.Time) ([]*snapshot.Manifest, error) {
|
||||
man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var previousComplete *snapshot.Manifest
|
||||
var result []*snapshot.Manifest
|
||||
|
||||
for _, p := range man {
|
||||
if noLaterThan != nil && p.StartTime.After(*noLaterThan) {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.IncompleteReason == "" && (previousComplete == nil || p.StartTime.After(previousComplete.StartTime)) {
|
||||
previousComplete = p
|
||||
}
|
||||
}
|
||||
|
||||
if previousComplete != nil {
|
||||
result = append(result, previousComplete)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
//Restore restore specific sourcePath with given snapshotID and update progress
|
||||
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
log.Info("Start to restore...")
|
||||
|
||||
rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(ctx, rep, snapshotID, false)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(err, "Unable to get filesystem entry for snapshot %v", snapshotID)
|
||||
}
|
||||
|
||||
path, err := filepath.Abs(dest)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(err, "Unable to resolve path %v", dest)
|
||||
}
|
||||
|
||||
output := &restore.FilesystemOutput{
|
||||
TargetPath: path,
|
||||
OverwriteDirectories: true,
|
||||
OverwriteFiles: true,
|
||||
OverwriteSymlinks: true,
|
||||
IgnorePermissionErrors: true,
|
||||
}
|
||||
|
||||
stat, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
|
||||
Parallel: runtime.NumCPU(),
|
||||
RestoreDirEntryAtDepth: math.MaxInt32,
|
||||
Cancel: cancleCh,
|
||||
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
|
||||
progress.ProgressBytes(stats.RestoredTotalFileSize, stats.EnqueuedTotalFileSize)
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target")
|
||||
}
|
||||
return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil
|
||||
}
|
||||
198
pkg/uploader/kopia/snapshot_test.go
Normal file
198
pkg/uploader/kopia/snapshot_test.go
Normal file
@@ -0,0 +1,198 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks"
|
||||
uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks"
|
||||
)
|
||||
|
||||
type snapshotMockes struct {
|
||||
policyMock *uploadermocks.Policy
|
||||
snapshotMock *uploadermocks.Snapshot
|
||||
uploderMock *uploadermocks.Uploader
|
||||
repoWriterMock *repomocks.RepositoryWriter
|
||||
}
|
||||
|
||||
type mockArgs struct {
|
||||
methodName string
|
||||
returns []interface{}
|
||||
}
|
||||
|
||||
func InjectSnapshotFuncs() *snapshotMockes {
|
||||
s := &snapshotMockes{
|
||||
policyMock: &uploadermocks.Policy{},
|
||||
snapshotMock: &uploadermocks.Snapshot{},
|
||||
uploderMock: &uploadermocks.Uploader{},
|
||||
repoWriterMock: &repomocks.RepositoryWriter{},
|
||||
}
|
||||
|
||||
setPolicyFunc = s.policyMock.SetPolicy
|
||||
treeForSourceFunc = s.policyMock.TreeForSource
|
||||
applyRetentionPolicyFunc = s.policyMock.ApplyRetentionPolicy
|
||||
loadSnapshotFunc = s.snapshotMock.LoadSnapshot
|
||||
saveSnapshotFunc = s.snapshotMock.SaveSnapshot
|
||||
return s
|
||||
}
|
||||
|
||||
func MockFuncs(s *snapshotMockes, args []mockArgs) {
|
||||
s.snapshotMock.On("LoadSnapshot", mock.Anything, mock.Anything, mock.Anything).Return(args[0].returns...)
|
||||
s.snapshotMock.On("SaveSnapshot", mock.Anything, mock.Anything, mock.Anything).Return(args[1].returns...)
|
||||
s.policyMock.On("TreeForSource", mock.Anything, mock.Anything, mock.Anything).Return(args[2].returns...)
|
||||
s.policyMock.On("ApplyRetentionPolicy", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[3].returns...)
|
||||
s.policyMock.On("SetPolicy", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[4].returns...)
|
||||
s.uploderMock.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[5].returns...)
|
||||
s.repoWriterMock.On("Flush", mock.Anything).Return(args[6].returns...)
|
||||
}
|
||||
|
||||
func TestSnapshotSource(t *testing.T) {
|
||||
|
||||
ctx := context.TODO()
|
||||
sourceInfo := snapshot.SourceInfo{
|
||||
UserName: "testUserName",
|
||||
Host: "testHost",
|
||||
Path: "/var",
|
||||
}
|
||||
rootDir, err := getLocalFSEntry(sourceInfo.Path)
|
||||
assert.NoError(t, err)
|
||||
log := logrus.New()
|
||||
manifest := &snapshot.Manifest{
|
||||
ID: "test",
|
||||
RootEntry: &snapshot.DirEntry{},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
args []mockArgs
|
||||
notError bool
|
||||
}{
|
||||
{
|
||||
name: "regular test",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "Flush", returns: []interface{}{nil}},
|
||||
},
|
||||
notError: true,
|
||||
},
|
||||
{
|
||||
name: "failed to load snapshot",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, errors.New("failed to load snapshot")}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "Flush", returns: []interface{}{nil}},
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "failed to save snapshot",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, errors.New("failed to save snapshot")}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "Flush", returns: []interface{}{nil}},
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "failed to apply policy",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, errors.New("failed to save snapshot")}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "Flush", returns: []interface{}{nil}},
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "failed to set policy",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{errors.New("failed to set policy")}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "Flush", returns: []interface{}{nil}},
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "failed to upload snapshot",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, errors.New("failed to upload snapshot")}},
|
||||
{methodName: "Flush", returns: []interface{}{nil}},
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "failed to flush repo",
|
||||
args: []mockArgs{
|
||||
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, errors.New("failed to save snapshot")}},
|
||||
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
|
||||
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
|
||||
{methodName: "SetPolicy", returns: []interface{}{nil}},
|
||||
{methodName: "Upload", returns: []interface{}{manifest, nil}},
|
||||
{methodName: "Flush", returns: []interface{}{errors.New("failed to flush repo")}},
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := InjectSnapshotFuncs()
|
||||
MockFuncs(s, tc.args)
|
||||
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource")
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
92
pkg/uploader/mocks/policy.go
Normal file
92
pkg/uploader/mocks/policy.go
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
// policy is an autogenerated mock type for the TreeForSource type
|
||||
type Policy struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Execute provides a mock function with given fields: ctx, rep, si
|
||||
func (_m *Policy) TreeForSource(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) (*policy.Tree, error) {
|
||||
ret := _m.Called(ctx, rep, si)
|
||||
|
||||
var r0 *policy.Tree
|
||||
if rf, ok := ret.Get(0).(func(context.Context, repo.Repository, snapshot.SourceInfo) *policy.Tree); ok {
|
||||
r0 = rf(ctx, rep, si)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*policy.Tree)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, repo.Repository, snapshot.SourceInfo) error); ok {
|
||||
r1 = rf(ctx, rep, si)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// ApplyRetentionPolicy provides a mock function with given fields: ctx, rep, sourceInfo, reallyDelete
|
||||
func (_m *Policy) ApplyRetentionPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo, reallyDelete bool) ([]*snapshot.Manifest, error) {
|
||||
ret := _m.Called(ctx, rep, sourceInfo, reallyDelete)
|
||||
|
||||
var r0 []*snapshot.Manifest
|
||||
if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, bool) []*snapshot.Manifest); ok {
|
||||
r0 = rf(ctx, rep, sourceInfo, reallyDelete)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*snapshot.Manifest)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, bool) error); ok {
|
||||
r1 = rf(ctx, rep, sourceInfo, reallyDelete)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (_m *Policy) SetPolicy(ctx context.Context, rep repo.RepositoryWriter, si snapshot.SourceInfo, pol *policy.Policy) error {
|
||||
ret := _m.Called(ctx, rep, si, pol)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, *policy.Policy) error); ok {
|
||||
r0 = rf(ctx, rep, si, pol)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
42
pkg/uploader/mocks/shim.go
Normal file
42
pkg/uploader/mocks/shim.go
Normal file
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// shimRepository is an autogenerated mock type for the shimRepository type
|
||||
type ShimRepository struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Flush provides a mock function with given fields: ctx
|
||||
func (_m *ShimRepository) Flush(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
76
pkg/uploader/mocks/snapshot.go
Normal file
76
pkg/uploader/mocks/snapshot.go
Normal file
@@ -0,0 +1,76 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
)
|
||||
|
||||
// snapshot is an autogenerated mock type for the snapshot type
|
||||
type Snapshot struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// LoadSnapshot provides a mock function with given fields: ctx, rep, manifestID
|
||||
func (_m *Snapshot) LoadSnapshot(ctx context.Context, rep repo.Repository, manifestID manifest.ID) (*snapshot.Manifest, error) {
|
||||
ret := _m.Called(ctx, rep, manifestID)
|
||||
|
||||
var r0 *snapshot.Manifest
|
||||
if rf, ok := ret.Get(0).(func(context.Context, repo.Repository, manifest.ID) *snapshot.Manifest); ok {
|
||||
r0 = rf(ctx, rep, manifestID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*snapshot.Manifest)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, repo.Repository, manifest.ID) error); ok {
|
||||
r1 = rf(ctx, rep, manifestID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// SaveSnapshot provides a mock function with given fields: ctx, rep, man
|
||||
func (_m *Snapshot) SaveSnapshot(ctx context.Context, rep repo.RepositoryWriter, man *snapshot.Manifest) (manifest.ID, error) {
|
||||
ret := _m.Called(ctx, rep, man)
|
||||
|
||||
var r0 manifest.ID
|
||||
if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, *snapshot.Manifest) manifest.ID); ok {
|
||||
r0 = rf(ctx, rep, man)
|
||||
} else {
|
||||
r0 = ret.Get(0).(manifest.ID)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, repo.RepositoryWriter, *snapshot.Manifest) error); ok {
|
||||
r1 = rf(ctx, rep, man)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
63
pkg/uploader/mocks/uploader.go
Normal file
63
pkg/uploader/mocks/uploader.go
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
// Upload is an autogenerated mock type for the Upload type
|
||||
type Uploader struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Execute provides a mock function with given fields: ctx, source, policyTree, sourceInfo, previousManifests
|
||||
func (_m *Uploader) Upload(ctx context.Context, source fs.Entry, policyTree *policy.Tree, sourceInfo snapshot.SourceInfo, previousManifests ...*snapshot.Manifest) (*snapshot.Manifest, error) {
|
||||
_va := make([]interface{}, len(previousManifests))
|
||||
for _i := range previousManifests {
|
||||
_va[_i] = previousManifests[_i]
|
||||
}
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, ctx, source, policyTree, sourceInfo)
|
||||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
var r0 *snapshot.Manifest
|
||||
if rf, ok := ret.Get(0).(func(context.Context, fs.Entry, *policy.Tree, snapshot.SourceInfo, ...*snapshot.Manifest) *snapshot.Manifest); ok {
|
||||
r0 = rf(ctx, source, policyTree, sourceInfo, previousManifests...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*snapshot.Manifest)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, fs.Entry, *policy.Tree, snapshot.SourceInfo, ...*snapshot.Manifest) error); ok {
|
||||
r1 = rf(ctx, source, policyTree, sourceInfo, previousManifests...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
208
pkg/uploader/provider/kopia.go
Normal file
208
pkg/uploader/provider/kopia.go
Normal file
@@ -0,0 +1,208 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
repokeys "github.com/vmware-tanzu/velero/pkg/repository/keys"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service"
|
||||
)
|
||||
|
||||
//BackupFunc mainly used to make testing more convenient
|
||||
var BackupFunc = kopia.Backup
|
||||
var RestoreFunc = kopia.Restore
|
||||
|
||||
//kopiaProvider recorded info related with kopiaProvider
|
||||
type kopiaProvider struct {
|
||||
bkRepo udmrepo.BackupRepo
|
||||
credGetter *credentials.CredentialGetter
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
//NewKopiaUploaderProvider initialized with open or create a repository
|
||||
func NewKopiaUploaderProvider(
|
||||
ctx context.Context,
|
||||
credGetter *credentials.CredentialGetter,
|
||||
backupRepo *velerov1api.BackupRepository,
|
||||
log logrus.FieldLogger,
|
||||
) (Provider, error) {
|
||||
kp := &kopiaProvider{
|
||||
log: log,
|
||||
credGetter: credGetter,
|
||||
}
|
||||
//repoUID which is used to generate kopia repository config with unique directory path
|
||||
repoUID := string(backupRepo.GetUID())
|
||||
repoOpt, err := udmrepo.NewRepoOptions(
|
||||
udmrepo.WithPassword(kp, ""),
|
||||
udmrepo.WithConfigFile("", repoUID),
|
||||
udmrepo.WithDescription("Initial kopia uploader provider"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error to get repo options")
|
||||
}
|
||||
|
||||
repoSvc := service.Create(log)
|
||||
log.WithField("repoUID", repoUID).Info("Opening backup repo")
|
||||
|
||||
kp.bkRepo, err = repoSvc.Open(ctx, *repoOpt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Failed to find kopia repository")
|
||||
}
|
||||
return kp, nil
|
||||
}
|
||||
|
||||
//CheckContext check context status check if context is timeout or cancel and backup restore once finished it will quit and return
|
||||
func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struct{}, restoreChan chan struct{}, uploader *snapshotfs.Uploader) {
|
||||
select {
|
||||
case <-finishChan:
|
||||
kp.log.Infof("Action finished")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
if uploader != nil {
|
||||
uploader.Cancel()
|
||||
kp.log.Infof("Backup is been canceled")
|
||||
}
|
||||
if restoreChan != nil {
|
||||
close(restoreChan)
|
||||
kp.log.Infof("Restore is been canceled")
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (kp *kopiaProvider) Close(ctx context.Context) {
|
||||
kp.bkRepo.Close(ctx)
|
||||
}
|
||||
|
||||
//RunBackup which will backup specific path and update backup progress
|
||||
func (kp *kopiaProvider) RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updater uploader.ProgressUpdater) (string, error) {
|
||||
if updater == nil {
|
||||
return "", errors.New("Need to initial backup progress updater first")
|
||||
}
|
||||
|
||||
log := kp.log.WithFields(logrus.Fields{
|
||||
"path": path,
|
||||
"parentSnapshot": parentSnapshot,
|
||||
})
|
||||
repoWriter := kopia.NewShimRepo(kp.bkRepo)
|
||||
kpUploader := snapshotfs.NewUploader(repoWriter)
|
||||
prorgess := new(kopia.KopiaProgress)
|
||||
prorgess.InitThrottle(backupProgressCheckInterval)
|
||||
prorgess.Updater = updater
|
||||
kpUploader.Progress = prorgess
|
||||
quit := make(chan struct{})
|
||||
log.Info("Starting backup")
|
||||
go kp.CheckContext(ctx, quit, nil, kpUploader)
|
||||
|
||||
defer func() {
|
||||
close(quit)
|
||||
}()
|
||||
|
||||
snapshotInfo, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log)
|
||||
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "Failed to run kopia backup")
|
||||
} else if snapshotInfo == nil {
|
||||
return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
|
||||
}
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(
|
||||
&uploader.UploaderProgress{
|
||||
TotalBytes: snapshotInfo.Size,
|
||||
BytesDone: snapshotInfo.Size,
|
||||
},
|
||||
)
|
||||
|
||||
log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
|
||||
return snapshotInfo.ID, nil
|
||||
}
|
||||
|
||||
func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
|
||||
if kp.credGetter.FromSecret == nil {
|
||||
return "", errors.New("invalid credentials interface")
|
||||
}
|
||||
rawPass, err := kp.credGetter.FromSecret.Get(repokeys.RepoKeySelector())
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "error to get password")
|
||||
}
|
||||
|
||||
return strings.TrimSpace(rawPass), nil
|
||||
}
|
||||
|
||||
//RunRestore which will restore specific path and update restore progress
|
||||
func (kp *kopiaProvider) RunRestore(
|
||||
ctx context.Context,
|
||||
snapshotID string,
|
||||
volumePath string,
|
||||
updater uploader.ProgressUpdater) error {
|
||||
log := kp.log.WithFields(logrus.Fields{
|
||||
"snapshotID": snapshotID,
|
||||
"volumePath": volumePath,
|
||||
})
|
||||
repoWriter := kopia.NewShimRepo(kp.bkRepo)
|
||||
prorgess := new(kopia.KopiaProgress)
|
||||
prorgess.InitThrottle(restoreProgressCheckInterval)
|
||||
prorgess.Updater = updater
|
||||
restoreCancel := make(chan struct{})
|
||||
quit := make(chan struct{})
|
||||
|
||||
log.Info("Starting restore")
|
||||
go kp.CheckContext(ctx, quit, restoreCancel, nil)
|
||||
|
||||
defer func() {
|
||||
if restoreCancel != nil {
|
||||
close(restoreCancel)
|
||||
}
|
||||
close(quit)
|
||||
}()
|
||||
|
||||
size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, restoreCancel)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Failed to run kopia restore")
|
||||
}
|
||||
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
TotalBytes: size,
|
||||
BytesDone: size,
|
||||
})
|
||||
|
||||
output := fmt.Sprintf("Kopia restore finished, restore size %d, file count %d", size, fileCount)
|
||||
|
||||
log.Info(output)
|
||||
|
||||
return nil
|
||||
}
|
||||
118
pkg/uploader/provider/kopia_test.go
Normal file
118
pkg/uploader/provider/kopia_test.go
Normal file
@@ -0,0 +1,118 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/controller"
|
||||
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
|
||||
)
|
||||
|
||||
func TestRunBackup(t *testing.T) {
|
||||
var kp kopiaProvider
|
||||
kp.log = logrus.New()
|
||||
updater := controller.BackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error)
|
||||
notError bool
|
||||
}{
|
||||
{
|
||||
name: "success to backup",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
return &uploader.SnapshotInfo{}, nil
|
||||
},
|
||||
notError: true,
|
||||
},
|
||||
{
|
||||
name: "get error to backup",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
return &uploader.SnapshotInfo{}, errors.New("failed to backup")
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
{
|
||||
name: "got empty snapshot",
|
||||
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
|
||||
return nil, nil
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
BackupFunc = tc.hookBackupFunc
|
||||
_, err := kp.RunBackup(context.Background(), "var", nil, "", &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunRestore(t *testing.T) {
|
||||
var kp kopiaProvider
|
||||
kp.log = logrus.New()
|
||||
updater := controller.RestoreProgressUpdater{PodVolumeRestore: &velerov1api.PodVolumeRestore{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewFakeClientWithScheme(scheme.Scheme)}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
|
||||
notError bool
|
||||
}{
|
||||
{
|
||||
name: "normal restore",
|
||||
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
return 0, 0, nil
|
||||
},
|
||||
notError: true,
|
||||
},
|
||||
{
|
||||
name: "failed to restore",
|
||||
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
return 0, 0, errors.New("failed to restore")
|
||||
},
|
||||
notError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
RestoreFunc = tc.hookRestoreFunc
|
||||
err := kp.RunRestore(context.Background(), "", "/var", &updater)
|
||||
if tc.notError {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -18,27 +18,54 @@ package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/uploader"
|
||||
)
|
||||
|
||||
const restoreProgressCheckInterval = 10 * time.Second
|
||||
const backupProgressCheckInterval = 10 * time.Second
|
||||
|
||||
// Provider which is designed for one pod volumn to do the backup or restore
|
||||
type Provider interface {
|
||||
// RunBackup which will do backup for one specific volumn and return snapshotID error
|
||||
// updateFunc which is used for update backup progress into related pvb status
|
||||
// updater is used for updating backup progress which implement by third-party
|
||||
RunBackup(
|
||||
ctx context.Context,
|
||||
path string,
|
||||
tags map[string]string,
|
||||
parentSnapshot string,
|
||||
updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, error)
|
||||
updater uploader.ProgressUpdater) (string, error)
|
||||
// RunRestore which will do restore for one specific volumn with given snapshot id and return error
|
||||
// updateFunc which is used for update restore progress into related pvr status
|
||||
// updater is used for updating backup progress which implement by third-party
|
||||
RunRestore(
|
||||
ctx context.Context,
|
||||
snapshotID string,
|
||||
volumePath string,
|
||||
updateFunc func(velerov1api.PodVolumeOperationProgress)) error
|
||||
updater uploader.ProgressUpdater) error
|
||||
// Close which will close related repository
|
||||
Close(ctx context.Context)
|
||||
}
|
||||
|
||||
// NewUploaderProvider initialize provider with specific uploaderType
|
||||
func NewUploaderProvider(
|
||||
ctx context.Context,
|
||||
uploaderType string,
|
||||
repoIdentifier string,
|
||||
bsl *velerov1api.BackupStorageLocation,
|
||||
backupReo *velerov1api.BackupRepository,
|
||||
credGetter *credentials.CredentialGetter,
|
||||
repoKeySelector *v1.SecretKeySelector,
|
||||
log logrus.FieldLogger,
|
||||
) (Provider, error) {
|
||||
if uploaderType == uploader.KopiaType {
|
||||
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
|
||||
} else {
|
||||
return NewKopiaUploaderProvider(ctx, credGetter, backupReo, log)
|
||||
}
|
||||
}
|
||||
|
||||
34
pkg/uploader/provider/restic.go
Normal file
34
pkg/uploader/provider/restic.go
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
Copyright The Velero Contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
package provider
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
func NewResticUploaderProvider(
|
||||
repoIdentifier string,
|
||||
bsl *velerov1api.BackupStorageLocation,
|
||||
credGetter *credentials.CredentialGetter,
|
||||
repoKeySelector *v1.SecretKeySelector,
|
||||
log logrus.FieldLogger,
|
||||
) (Provider, error) {
|
||||
return nil, nil //TODO
|
||||
}
|
||||
@@ -22,10 +22,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ResticType = "restic"
|
||||
KopiaType = "kopia"
|
||||
VeleroBackup = "backup"
|
||||
VeleroRestore = "restore"
|
||||
ResticType = "restic"
|
||||
KopiaType = "kopia"
|
||||
)
|
||||
|
||||
// ValidateUploaderType validates if the input param is a valid uploader type.
|
||||
@@ -43,7 +41,13 @@ type SnapshotInfo struct {
|
||||
Size int64 `json:"Size"`
|
||||
}
|
||||
|
||||
//UploaderProgress which defined two variables to record progress
|
||||
type UploaderProgress struct {
|
||||
TotalBytes int64 `json:"totalBytes,omitempty"`
|
||||
BytesDone int64 `json:"doneBytes,omitempty"`
|
||||
}
|
||||
|
||||
//UploaderProgress which defined generic interface to update progress
|
||||
type ProgressUpdater interface {
|
||||
UpdateProgress(p *UploaderProgress)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user