Uploader Implementation: Kopia backup and restore

Signed-off-by: Ming <mqiu@vmware.com>
This commit is contained in:
Ming
2022-08-08 07:10:04 +00:00
parent 4e25f59dc1
commit 262de19f52
19 changed files with 1582 additions and 44 deletions

View File

@@ -0,0 +1 @@
Uploader Implementation: Kopia backup and restore

1
go.sum
View File

@@ -547,6 +547,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0 h1:nHHjmvjitIiyPlUHk/ofpgvBcNcawJLtf4PYHORLjAA=
github.com/kubernetes-csi/external-snapshotter/client/v4 v4.2.0/go.mod h1:YBCo4DoEeDndqvAn6eeu0vWM7QdXmHEeI9cFWplmBys=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=

View File

@@ -16,6 +16,13 @@ limitations under the License.
package v1
import (
"context"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// PodVolumeOperationProgress represents the progress of a
// PodVolumeBackup/Restore (restic) operation
type PodVolumeOperationProgress struct {
@@ -25,3 +32,51 @@ type PodVolumeOperationProgress struct {
// +optional
BytesDone int64 `json:"bytesDone,omitempty"`
}
type BackupProgressUpdater struct {
pvb *PodVolumeBackup
log logrus.FieldLogger
ctx context.Context
cli client.Client
}
type RestoreProgressUpdater struct {
pvr *PodVolumeRestore
log logrus.FieldLogger
ctx context.Context
cli client.Client
}
func NewBackupProgressUpdater(pvb *PodVolumeBackup, log logrus.FieldLogger, ctx context.Context, cli client.Client) *BackupProgressUpdater {
return &BackupProgressUpdater{pvb, log, ctx, cli}
}
//UpdateProgress which implement ProgressUpdater to update pvb progress status
func (b *BackupProgressUpdater) UpdateProgress(p *UploaderProgress) {
original := b.pvb.DeepCopy()
b.pvb.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
if b.cli == nil {
b.log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume)
return
}
if err := b.cli.Patch(b.ctx, b.pvb, client.MergeFrom(original)); err != nil {
b.log.Errorf("update backup pod %s volume %s progress with %v", b.pvb.Spec.Pod.Name, b.pvb.Spec.Volume, err)
}
}
func NewRestoreProgressUpdater(pvr *PodVolumeRestore, log logrus.FieldLogger, ctx context.Context, cli client.Client) *RestoreProgressUpdater {
return &RestoreProgressUpdater{pvr, log, ctx, cli}
}
//UpdateProgress which implement ProgressUpdater to update update pvb progress status
func (r *RestoreProgressUpdater) UpdateProgress(p *UploaderProgress) {
original := r.pvr.DeepCopy()
r.pvr.Status.Progress = PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
if r.cli == nil {
r.log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume)
return
}
if err := r.cli.Patch(r.ctx, r.pvr, client.MergeFrom(original)); err != nil {
r.log.Errorf("update restore pod %s volume %s progress with %v", r.pvr.Spec.Pod.Name, r.pvr.Spec.Volume, err)
}
}

View File

@@ -0,0 +1,26 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
type UploaderProgress struct {
TotalBytes int64 `json:"totalBytes,omitempty"`
BytesDone int64 `json:"doneBytes,omitempty"`
}
type ProgressUpdater interface {
UpdateProgress(p *UploaderProgress)
}

View File

@@ -264,18 +264,6 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l
return mostRecentPVB.Status.SnapshotID
}
// updateBackupProgressFunc returns a func that takes progress info and patches
// the PVB with the new progress.
func (r *PodVolumeBackupReconciler) updateBackupProgressFunc(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
original := pvb.DeepCopy()
pvb.Status.Progress = progress
if err := r.Client.Patch(context.Background(), pvb, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error update progress")
}
}
}
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed

View File

@@ -317,15 +317,3 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
return nil
}
// updateRestoreProgressFunc returns a func that takes progress info and patches
// the PVR with the new progress
func (c *PodVolumeRestoreReconciler) updateRestoreProgressFunc(req *velerov1api.PodVolumeRestore, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) {
return func(progress velerov1api.PodVolumeOperationProgress) {
original := req.DeepCopy()
req.Status.Progress = progress
if err := c.Patch(context.Background(), req, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update PodVolumeRestore progress")
}
}
}

View File

@@ -0,0 +1,349 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
context "context"
index "github.com/kopia/kopia/repo/content/index"
manifest "github.com/kopia/kopia/repo/manifest"
mock "github.com/stretchr/testify/mock"
object "github.com/kopia/kopia/repo/object"
repo "github.com/kopia/kopia/repo"
time "time"
)
// RepositoryWriter is an autogenerated mock type for the RepositoryWriter type
type RepositoryWriter struct {
mock.Mock
}
// ClientOptions provides a mock function with given fields:
func (_m *RepositoryWriter) ClientOptions() repo.ClientOptions {
ret := _m.Called()
var r0 repo.ClientOptions
if rf, ok := ret.Get(0).(func() repo.ClientOptions); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(repo.ClientOptions)
}
return r0
}
// Close provides a mock function with given fields: ctx
func (_m *RepositoryWriter) Close(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// ContentInfo provides a mock function with given fields: ctx, contentID
func (_m *RepositoryWriter) ContentInfo(ctx context.Context, contentID index.ID) (index.Info, error) {
ret := _m.Called(ctx, contentID)
var r0 index.Info
if rf, ok := ret.Get(0).(func(context.Context, index.ID) index.Info); ok {
r0 = rf(ctx, contentID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(index.Info)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, index.ID) error); ok {
r1 = rf(ctx, contentID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DeleteManifest provides a mock function with given fields: ctx, id
func (_m *RepositoryWriter) DeleteManifest(ctx context.Context, id manifest.ID) error {
ret := _m.Called(ctx, id)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID) error); ok {
r0 = rf(ctx, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// FindManifests provides a mock function with given fields: ctx, labels
func (_m *RepositoryWriter) FindManifests(ctx context.Context, labels map[string]string) ([]*manifest.EntryMetadata, error) {
ret := _m.Called(ctx, labels)
var r0 []*manifest.EntryMetadata
if rf, ok := ret.Get(0).(func(context.Context, map[string]string) []*manifest.EntryMetadata); ok {
r0 = rf(ctx, labels)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*manifest.EntryMetadata)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, map[string]string) error); ok {
r1 = rf(ctx, labels)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Flush provides a mock function with given fields: ctx
func (_m *RepositoryWriter) Flush(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetManifest provides a mock function with given fields: ctx, id, data
func (_m *RepositoryWriter) GetManifest(ctx context.Context, id manifest.ID, data interface{}) (*manifest.EntryMetadata, error) {
ret := _m.Called(ctx, id, data)
var r0 *manifest.EntryMetadata
if rf, ok := ret.Get(0).(func(context.Context, manifest.ID, interface{}) *manifest.EntryMetadata); ok {
r0 = rf(ctx, id, data)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*manifest.EntryMetadata)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, manifest.ID, interface{}) error); ok {
r1 = rf(ctx, id, data)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewObjectWriter provides a mock function with given fields: ctx, opt
func (_m *RepositoryWriter) NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer {
ret := _m.Called(ctx, opt)
var r0 object.Writer
if rf, ok := ret.Get(0).(func(context.Context, object.WriterOptions) object.Writer); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(object.Writer)
}
}
return r0
}
// NewWriter provides a mock function with given fields: ctx, opt
func (_m *RepositoryWriter) NewWriter(ctx context.Context, opt repo.WriteSessionOptions) (context.Context, repo.RepositoryWriter, error) {
ret := _m.Called(ctx, opt)
var r0 context.Context
if rf, ok := ret.Get(0).(func(context.Context, repo.WriteSessionOptions) context.Context); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(context.Context)
}
}
var r1 repo.RepositoryWriter
if rf, ok := ret.Get(1).(func(context.Context, repo.WriteSessionOptions) repo.RepositoryWriter); ok {
r1 = rf(ctx, opt)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(repo.RepositoryWriter)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(context.Context, repo.WriteSessionOptions) error); ok {
r2 = rf(ctx, opt)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// OpenObject provides a mock function with given fields: ctx, id
func (_m *RepositoryWriter) OpenObject(ctx context.Context, id object.ID) (object.Reader, error) {
ret := _m.Called(ctx, id)
var r0 object.Reader
if rf, ok := ret.Get(0).(func(context.Context, object.ID) object.Reader); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(object.Reader)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PrefetchContents provides a mock function with given fields: ctx, contentIDs, hint
func (_m *RepositoryWriter) PrefetchContents(ctx context.Context, contentIDs []index.ID, hint string) []index.ID {
ret := _m.Called(ctx, contentIDs, hint)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, []index.ID, string) []index.ID); ok {
r0 = rf(ctx, contentIDs, hint)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
return r0
}
// PrefetchObjects provides a mock function with given fields: ctx, objectIDs, hint
func (_m *RepositoryWriter) PrefetchObjects(ctx context.Context, objectIDs []object.ID, hint string) ([]index.ID, error) {
ret := _m.Called(ctx, objectIDs, hint)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, []object.ID, string) []index.ID); ok {
r0 = rf(ctx, objectIDs, hint)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, []object.ID, string) error); ok {
r1 = rf(ctx, objectIDs, hint)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PutManifest provides a mock function with given fields: ctx, labels, payload
func (_m *RepositoryWriter) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) {
ret := _m.Called(ctx, labels, payload)
var r0 manifest.ID
if rf, ok := ret.Get(0).(func(context.Context, map[string]string, interface{}) manifest.ID); ok {
r0 = rf(ctx, labels, payload)
} else {
r0 = ret.Get(0).(manifest.ID)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, map[string]string, interface{}) error); ok {
r1 = rf(ctx, labels, payload)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Refresh provides a mock function with given fields: ctx
func (_m *RepositoryWriter) Refresh(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// Time provides a mock function with given fields:
func (_m *RepositoryWriter) Time() time.Time {
ret := _m.Called()
var r0 time.Time
if rf, ok := ret.Get(0).(func() time.Time); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Time)
}
return r0
}
// UpdateDescription provides a mock function with given fields: d
func (_m *RepositoryWriter) UpdateDescription(d string) {
_m.Called(d)
}
// VerifyObject provides a mock function with given fields: ctx, id
func (_m *RepositoryWriter) VerifyObject(ctx context.Context, id object.ID) ([]index.ID, error) {
ret := _m.Called(ctx, id)
var r0 []index.ID
if rf, ok := ret.Get(0).(func(context.Context, object.ID) []index.ID); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]index.ID)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, object.ID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@@ -20,7 +20,7 @@ import (
"sync/atomic"
"time"
"github.com/vmware-tanzu/velero/pkg/uploader"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
//Throttle throttles controlle the interval of output result
@@ -60,9 +60,9 @@ type KopiaProgress struct {
estimatedFileCount int32 // +checklocksignore the total count of files to be processed
estimatedTotalBytes int64 // +checklocksignore the total size of files to be processed
// +checkatomic
processedBytes int64 // which statistic all bytes has been processed currently
outputThrottle Throttle // which control the frequency of update progress
UpFunc func(uploader.UploaderProgress) //which called by UpdateProgress func, it is used to update pvb or pvr status
processedBytes int64 // which statistic all bytes has been processed currently
outputThrottle Throttle // which control the frequency of update progress
Updater velerov1api.ProgressUpdater //which the kopia progress will call the UpdateProgress, the third party will implement the interface to update progress
}
//UploadedBytes the total bytes has uploaded currently
@@ -93,10 +93,7 @@ func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
//UpdateProgress which called by UpdateProgress func, it is used to update pvb or pvr status
func (p *KopiaProgress) UpdateProgress() {
if p.outputThrottle.ShouldOutput() {
p.UpFunc(uploader.UploaderProgress{
TotalBytes: atomic.LoadInt64(&p.estimatedTotalBytes),
BytesDone: atomic.LoadInt64(&p.processedBytes),
})
p.Updater.UpdateProgress(&velerov1api.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes})
}
}

View File

@@ -0,0 +1,286 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
)
//All function mainly used to make testing more convenient
var treeForSourceFunc = policy.TreeForSource
var applyRetentionPolicyFunc = policy.ApplyRetentionPolicy
var setPolicyFunc = policy.SetPolicy
var saveSnapshotFunc = snapshot.SaveSnapshot
var loadSnapshotFunc = snapshot.LoadSnapshot
//SnapshotUploader which mainly used for UT test that could overwrite Upload interface
type SnapshotUploader interface {
Upload(
ctx context.Context,
source fs.Entry,
policyTree *policy.Tree,
sourceInfo snapshot.SourceInfo,
previousManifests ...*snapshot.Manifest,
) (*snapshot.Manifest, error)
}
func newOptionalInt(b policy.OptionalInt) *policy.OptionalInt {
return &b
}
//setupDefaultPolicy set default policy for kopia
func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo) error {
return setPolicyFunc(ctx, rep, sourceInfo, &policy.Policy{
RetentionPolicy: policy.RetentionPolicy{
KeepLatest: newOptionalInt(math.MaxInt32),
},
CompressionPolicy: policy.CompressionPolicy{
CompressorName: "none",
},
UploadPolicy: policy.UploadPolicy{
MaxParallelFileReads: newOptionalInt(policy.OptionalInt(runtime.NumCPU())),
},
SchedulingPolicy: policy.SchedulingPolicy{
Manual: true,
},
})
}
//Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string,
parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
if fsUploader == nil {
return nil, fmt.Errorf("get empty kopia uploader")
}
dir, err := filepath.Abs(sourcePath)
if err != nil {
return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
}
sourceInfo := snapshot.SourceInfo{
Path: filepath.Clean(dir),
}
sourceInfo.UserName, sourceInfo.Host = service.GetRepoUser()
rootDir, err := getLocalFSEntry(sourceInfo.Path)
if err != nil {
return nil, errors.Wrap(err, "Unable to get local filesystem entry")
}
snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
if err != nil {
return nil, err
}
snapshotInfo := &uploader.SnapshotInfo{
ID: snapID,
Size: snapshotSize,
}
return snapshotInfo, nil
}
func getLocalFSEntry(path0 string) (fs.Entry, error) {
path, err := resolveSymlink(path0)
if err != nil {
return nil, errors.Wrap(err, "resolveSymlink")
}
e, err := localfs.NewEntry(path)
if err != nil {
return nil, errors.Wrap(err, "can't get local fs entry")
}
return e, nil
}
//resolveSymlink returns the path name after the evaluation of any symbolic links
func resolveSymlink(path string) (string, error) {
st, err := os.Lstat(path)
if err != nil {
return "", errors.Wrap(err, "stat")
}
if (st.Mode() & os.ModeSymlink) == 0 {
return path, nil
}
return filepath.EvalSymlinks(path)
}
//SnapshotSource which setup policy for snapshot, upload snapshot, update progress
func SnapshotSource(
ctx context.Context,
rep repo.RepositoryWriter,
u SnapshotUploader,
sourceInfo snapshot.SourceInfo,
rootDir fs.Entry,
parentSnapshot string,
log logrus.FieldLogger,
description string,
) (string, int64, error) {
log.Info("Start to snapshot...")
snapshotStartTime := time.Now()
var previous []*snapshot.Manifest
if parentSnapshot != "" {
mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot))
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot)
}
previous = append(previous, mani)
} else {
pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil)
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo)
}
previous = pre
}
var manifest *snapshot.Manifest
if err := setupDefaultPolicy(ctx, rep, sourceInfo); err != nil {
return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo)
}
policyTree, err := treeForSourceFunc(ctx, rep, sourceInfo)
if err != nil {
return "", 0, errors.Wrapf(err, "unable to create policy getter for si %v", sourceInfo)
}
manifest, err = u.Upload(ctx, rootDir, policyTree, sourceInfo, previous...)
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to upload the kopia snapshot for si %v", sourceInfo)
}
manifest.Description = description
if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil {
return "", 0, errors.Wrapf(err, "Failed to save kopia manifest %v", manifest.ID)
}
_, err = applyRetentionPolicyFunc(ctx, rep, sourceInfo, true)
if err != nil {
return "", 0, errors.Wrapf(err, "Failed to apply kopia retention policy for si %v", sourceInfo)
}
if err = rep.Flush(ctx); err != nil {
return "", 0, errors.Wrapf(err, "Failed to flush kopia repository")
}
log.Infof("Created snapshot with root %v and ID %v in %v", manifest.RootObjectID(), manifest.ID, time.Since(snapshotStartTime).Truncate(time.Second))
return reportSnapshotStatus(manifest)
}
func reportSnapshotStatus(manifest *snapshot.Manifest) (string, int64, error) {
manifestID := manifest.ID
snapSize := manifest.Stats.TotalFileSize
var errs []string
if ds := manifest.RootEntry.DirSummary; ds != nil {
for _, ent := range ds.FailedEntries {
errs = append(errs, ent.Error)
}
}
if len(errs) != 0 {
return "", 0, errors.New(strings.Join(errs, "\n"))
}
return string(manifestID), snapSize, nil
}
// findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including
// last complete snapshot following it.
func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, noLaterThan *time.Time) ([]*snapshot.Manifest, error) {
man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo)
if err != nil {
return nil, err
}
var previousComplete *snapshot.Manifest
var result []*snapshot.Manifest
for _, p := range man {
if noLaterThan != nil && p.StartTime.After(*noLaterThan) {
continue
}
if p.IncompleteReason == "" && (previousComplete == nil || p.StartTime.After(previousComplete.StartTime)) {
previousComplete = p
}
}
if previousComplete != nil {
result = append(result, previousComplete)
}
return result, nil
}
//Restore restore specific sourcePath with given snapshotID and update progress
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
log.Info("Start to restore...")
rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(ctx, rep, snapshotID, false)
if err != nil {
return 0, 0, errors.Wrapf(err, "Unable to get filesystem entry for snapshot %v", snapshotID)
}
path, err := filepath.Abs(dest)
if err != nil {
return 0, 0, errors.Wrapf(err, "Unable to resolve path %v", dest)
}
output := &restore.FilesystemOutput{
TargetPath: path,
OverwriteDirectories: true,
OverwriteFiles: true,
OverwriteSymlinks: true,
IgnorePermissionErrors: true,
}
stat, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
RestoreDirEntryAtDepth: math.MaxInt32,
Cancel: cancleCh,
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
progress.ProgressBytes(stats.RestoredTotalFileSize, stats.EnqueuedTotalFileSize)
},
})
if err != nil {
return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target")
}
return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil
}

View File

@@ -0,0 +1,199 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"context"
"testing"
"github.com/kopia/kopia/snapshot"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
repomocks "github.com/vmware-tanzu/velero/pkg/repository/mocks"
"github.com/vmware-tanzu/velero/pkg/uploader"
uploadermocks "github.com/vmware-tanzu/velero/pkg/uploader/mocks"
)
type snapshotMockes struct {
policyMock *uploadermocks.Policy
snapshotMock *uploadermocks.Snapshot
uploderMock *uploadermocks.Uploader
repoWriterMock *repomocks.RepositoryWriter
}
type mockArgs struct {
methodName string
returns []interface{}
}
func InjectSnapshotFuncs() *snapshotMockes {
s := &snapshotMockes{
policyMock: &uploadermocks.Policy{},
snapshotMock: &uploadermocks.Snapshot{},
uploderMock: &uploadermocks.Uploader{},
repoWriterMock: &repomocks.RepositoryWriter{},
}
setPolicyFunc = s.policyMock.SetPolicy
treeForSourceFunc = s.policyMock.TreeForSource
applyRetentionPolicyFunc = s.policyMock.ApplyRetentionPolicy
loadSnapshotFunc = s.snapshotMock.LoadSnapshot
saveSnapshotFunc = s.snapshotMock.SaveSnapshot
return s
}
func MockFuncs(s *snapshotMockes, args []mockArgs) {
s.snapshotMock.On("LoadSnapshot", mock.Anything, mock.Anything, mock.Anything).Return(args[0].returns...)
s.snapshotMock.On("SaveSnapshot", mock.Anything, mock.Anything, mock.Anything).Return(args[1].returns...)
s.policyMock.On("TreeForSource", mock.Anything, mock.Anything, mock.Anything).Return(args[2].returns...)
s.policyMock.On("ApplyRetentionPolicy", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[3].returns...)
s.policyMock.On("SetPolicy", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[4].returns...)
s.uploderMock.On("Upload", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(args[5].returns...)
s.repoWriterMock.On("Flush", mock.Anything).Return(args[6].returns...)
}
func TestSnapshotSource(t *testing.T) {
ctx := context.TODO()
sourceInfo := snapshot.SourceInfo{
UserName: "testUserName",
Host: "testHost",
Path: "/var",
}
rootDir, err := getLocalFSEntry(sourceInfo.Path)
assert.NoError(t, err)
log := logrus.New()
manifest := &snapshot.Manifest{
ID: "test",
RootEntry: &snapshot.DirEntry{},
}
testCases := []struct {
name string
args []mockArgs
notError bool
}{
{
name: "regular test",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
},
notError: true,
},
{
name: "failed to load snapshot",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, errors.New("failed to load snapshot")}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
},
notError: false,
},
{
name: "failed to save snapshot",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, errors.New("failed to save snapshot")}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
},
notError: false,
},
{
name: "failed to apply policy",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, errors.New("failed to save snapshot")}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
},
notError: false,
},
{
name: "failed to set policy",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{errors.New("failed to set policy")}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{nil}},
},
notError: false,
},
{
name: "failed to upload snapshot",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, nil}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, errors.New("failed to upload snapshot")}},
{methodName: "Flush", returns: []interface{}{nil}},
},
notError: false,
},
{
name: "failed to flush repo",
args: []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
{methodName: "SaveSnapshot", returns: []interface{}{manifest.ID, errors.New("failed to save snapshot")}},
{methodName: "TreeForSource", returns: []interface{}{nil, nil}},
{methodName: "ApplyRetentionPolicy", returns: []interface{}{nil, nil}},
{methodName: "SetPolicy", returns: []interface{}{nil}},
{methodName: "Upload", returns: []interface{}{manifest, nil}},
{methodName: "Flush", returns: []interface{}{errors.New("failed to flush repo")}},
},
notError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s := InjectSnapshotFuncs()
MockFuncs(s, tc.args)
_, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource", func(up uploader.UploaderProgress) {})
if tc.notError {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
})
}
}

View File

@@ -0,0 +1,92 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
"context"
"github.com/kopia/kopia/snapshot/policy"
"github.com/stretchr/testify/mock"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
)
// policy is an autogenerated mock type for the TreeForSource type
type Policy struct {
mock.Mock
}
// Execute provides a mock function with given fields: ctx, rep, si
func (_m *Policy) TreeForSource(ctx context.Context, rep repo.Repository, si snapshot.SourceInfo) (*policy.Tree, error) {
ret := _m.Called(ctx, rep, si)
var r0 *policy.Tree
if rf, ok := ret.Get(0).(func(context.Context, repo.Repository, snapshot.SourceInfo) *policy.Tree); ok {
r0 = rf(ctx, rep, si)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*policy.Tree)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, repo.Repository, snapshot.SourceInfo) error); ok {
r1 = rf(ctx, rep, si)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ApplyRetentionPolicy provides a mock function with given fields: ctx, rep, sourceInfo, reallyDelete
func (_m *Policy) ApplyRetentionPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceInfo snapshot.SourceInfo, reallyDelete bool) ([]*snapshot.Manifest, error) {
ret := _m.Called(ctx, rep, sourceInfo, reallyDelete)
var r0 []*snapshot.Manifest
if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, bool) []*snapshot.Manifest); ok {
r0 = rf(ctx, rep, sourceInfo, reallyDelete)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*snapshot.Manifest)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, bool) error); ok {
r1 = rf(ctx, rep, sourceInfo, reallyDelete)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
func (_m *Policy) SetPolicy(ctx context.Context, rep repo.RepositoryWriter, si snapshot.SourceInfo, pol *policy.Policy) error {
ret := _m.Called(ctx, rep, si, pol)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, snapshot.SourceInfo, *policy.Policy) error); ok {
r0 = rf(ctx, rep, si, pol)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@@ -0,0 +1,42 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// shimRepository is an autogenerated mock type for the shimRepository type
type ShimRepository struct {
mock.Mock
}
// Flush provides a mock function with given fields: ctx
func (_m *ShimRepository) Flush(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@@ -0,0 +1,76 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
"context"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/stretchr/testify/mock"
"github.com/kopia/kopia/repo"
)
// snapshot is an autogenerated mock type for the snapshot type
type Snapshot struct {
mock.Mock
}
// LoadSnapshot provides a mock function with given fields: ctx, rep, manifestID
func (_m *Snapshot) LoadSnapshot(ctx context.Context, rep repo.Repository, manifestID manifest.ID) (*snapshot.Manifest, error) {
ret := _m.Called(ctx, rep, manifestID)
var r0 *snapshot.Manifest
if rf, ok := ret.Get(0).(func(context.Context, repo.Repository, manifest.ID) *snapshot.Manifest); ok {
r0 = rf(ctx, rep, manifestID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*snapshot.Manifest)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, repo.Repository, manifest.ID) error); ok {
r1 = rf(ctx, rep, manifestID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SaveSnapshot provides a mock function with given fields: ctx, rep, man
func (_m *Snapshot) SaveSnapshot(ctx context.Context, rep repo.RepositoryWriter, man *snapshot.Manifest) (manifest.ID, error) {
ret := _m.Called(ctx, rep, man)
var r0 manifest.ID
if rf, ok := ret.Get(0).(func(context.Context, repo.RepositoryWriter, *snapshot.Manifest) manifest.ID); ok {
r0 = rf(ctx, rep, man)
} else {
r0 = ret.Get(0).(manifest.ID)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, repo.RepositoryWriter, *snapshot.Manifest) error); ok {
r1 = rf(ctx, rep, man)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@@ -0,0 +1,63 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mocks
import (
"context"
"github.com/kopia/kopia/fs"
"github.com/stretchr/testify/mock"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot"
)
// Upload is an autogenerated mock type for the Upload type
type Uploader struct {
mock.Mock
}
// Execute provides a mock function with given fields: ctx, source, policyTree, sourceInfo, previousManifests
func (_m *Uploader) Upload(ctx context.Context, source fs.Entry, policyTree *policy.Tree, sourceInfo snapshot.SourceInfo, previousManifests ...*snapshot.Manifest) (*snapshot.Manifest, error) {
_va := make([]interface{}, len(previousManifests))
for _i := range previousManifests {
_va[_i] = previousManifests[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, source, policyTree, sourceInfo)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *snapshot.Manifest
if rf, ok := ret.Get(0).(func(context.Context, fs.Entry, *policy.Tree, snapshot.SourceInfo, ...*snapshot.Manifest) *snapshot.Manifest); ok {
r0 = rf(ctx, source, policyTree, sourceInfo, previousManifests...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*snapshot.Manifest)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, fs.Entry, *policy.Tree, snapshot.SourceInfo, ...*snapshot.Manifest) error); ok {
r1 = rf(ctx, source, policyTree, sourceInfo, previousManifests...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@@ -0,0 +1,204 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"fmt"
"strings"
"time"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
repokeys "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/service"
)
//BackupFunc mainly used to make testing more convenient
var BackupFunc = kopia.Backup
var RestoreFunc = kopia.Restore
//kopiaProvider recorded info related with kopiaProvider
//action which means provider handle backup or restore
type kopiaProvider struct {
bkRepo udmrepo.BackupRepo
credGetter *credentials.CredentialGetter
uploader *snapshotfs.Uploader
restoreCancel chan struct{}
log logrus.FieldLogger
}
//NewKopiaUploaderProvider initialized with open or create a repository
func NewKopiaUploaderProvider(
ctx context.Context,
credGetter *credentials.CredentialGetter,
bsl *velerov1api.BackupStorageLocation,
log logrus.FieldLogger,
) (Provider, error) {
kp := &kopiaProvider{
log: log,
credGetter: credGetter,
}
//repoUID which is used to generate kopia repository config with unique directory path
repoUID := string(bsl.GetUID())
repoOpt, err := udmrepo.NewRepoOptions(
udmrepo.WithPassword(kp, ""),
udmrepo.WithConfigFile("", repoUID),
udmrepo.WithDescription("Initial kopia uploader provider"),
)
if err != nil {
return nil, errors.Wrapf(err, "error to get repo options")
}
repoSvc := service.Create(log)
log.WithField("repoUID", repoUID).Info("Opening backup repo")
kp.bkRepo, err = repoSvc.Open(ctx, *repoOpt)
if err != nil {
return nil, errors.Wrapf(err, "Failed to find kopia repository")
}
return kp, nil
}
//CheckContext check context status periodically
//check if context is timeout or cancel
func (kp *kopiaProvider) CheckContext(ctx context.Context) {
for {
select {
case <-ctx.Done():
if kp.uploader != nil {
kp.uploader.Cancel()
kp.log.Infof("Backup is been canceled")
}
if kp.restoreCancel != nil {
close(kp.restoreCancel)
kp.log.Infof("Restore is been canceled")
}
return
default:
time.Sleep(time.Second * 10)
}
}
}
func (kp *kopiaProvider) Close(ctx context.Context) {
kp.bkRepo.Close(ctx)
}
//RunBackup which will backup specific path and update backup progress in pvb status
func (kp *kopiaProvider) RunBackup(
ctx context.Context,
path string,
tags map[string]string,
parentSnapshot string,
updater velerov1api.ProgressUpdater) (string, error) {
if updater == nil {
return "", errors.New("Need to inital backup progress updater first")
}
log := kp.log.WithFields(logrus.Fields{
"path": path,
"parentSnapshot": parentSnapshot,
})
repoWriter := kopia.NewShimRepo(kp.bkRepo)
kp.uploader = snapshotfs.NewUploader(repoWriter)
prorgess := new(kopia.KopiaProgress)
prorgess.InitThrottle(backupProgressCheckInterval)
prorgess.Updater = updater
kp.uploader.Progress = prorgess
log.Info("Starting backup")
go kp.CheckContext(ctx)
snapshotInfo, err := BackupFunc(ctx, kp.uploader, repoWriter, path, parentSnapshot, log)
if err != nil {
return "", errors.Wrapf(err, "Failed to run kopia backup")
} else if snapshotInfo == nil {
return "", fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
}
updater.UpdateProgress(
&velerov1api.UploaderProgress{
TotalBytes: snapshotInfo.Size,
BytesDone: snapshotInfo.Size,
},
)
log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, nil
}
func (kp *kopiaProvider) GetPassword(param interface{}) (string, error) {
if kp.credGetter.FromSecret == nil {
return "", errors.New("invalid credentials interface")
}
rawPass, err := kp.credGetter.FromSecret.Get(repokeys.RepoKeySelector())
if err != nil {
return "", errors.Wrap(err, "error to get password")
}
return strings.TrimSpace(rawPass), nil
}
//RunRestore which will restore specific path and update restore progress in pvr status
func (kp *kopiaProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
updater velerov1api.ProgressUpdater) error {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
})
repoWriter := kopia.NewShimRepo(kp.bkRepo)
prorgess := new(kopia.KopiaProgress)
prorgess.InitThrottle(restoreProgressCheckInterval)
prorgess.Updater = updater
kp.restoreCancel = make(chan struct{})
defer func() {
if kp.restoreCancel != nil {
close(kp.restoreCancel)
}
}()
log.Info("Starting restore")
go kp.CheckContext(ctx)
size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, kp.restoreCancel)
if err != nil {
return errors.Wrapf(err, "Failed to run kopia restore")
}
updater.UpdateProgress(&velerov1api.UploaderProgress{
TotalBytes: size,
BytesDone: size,
})
output := fmt.Sprintf("Kopia restore finished, restore size %d, file count %d", size, fileCount)
log.Info(output)
return nil
}

View File

@@ -0,0 +1,118 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"context"
"testing"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/scheme"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/kopia"
)
func TestRunBackup(t *testing.T) {
var kp kopiaProvider
kp.log = logrus.New()
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
updater := velerov1api.NewBackupProgressUpdater(&velerov1api.PodVolumeBackup{}, kp.log, context.Background(), fakeClient)
testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error)
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
return &uploader.SnapshotInfo{}, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
return &uploader.SnapshotInfo{}, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, error) {
return nil, nil
},
notError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
BackupFunc = tc.hookBackupFunc
_, err := kp.RunBackup(context.Background(), "var", nil, "", updater)
if tc.notError {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
})
}
}
func TestRunRestore(t *testing.T) {
var kp kopiaProvider
kp.log = logrus.New()
updater := velerov1api.NewRestoreProgressUpdater(&velerov1api.PodVolumeRestore{}, kp.log, context.Background(), fake.NewFakeClientWithScheme(scheme.Scheme))
testCases := []struct {
name string
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
notError bool
}{
{
name: "normal restore",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, nil
},
notError: true,
},
{
name: "failed to restore",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, errors.New("failed to restore")
},
notError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", updater)
if tc.notError {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}
})
}
}

View File

@@ -18,27 +18,53 @@ package provider
import (
"context"
"time"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/uploader"
)
const restoreProgressCheckInterval = 10 * time.Second
const backupProgressCheckInterval = 10 * time.Second
// Provider which is designed for one pod volumn to do the backup or restore
type Provider interface {
// RunBackup which will do backup for one specific volumn and return snapshotID error
// updateFunc which is used for update backup progress into related pvb status
// updater which is used for update backup progress into related pvb status
RunBackup(
ctx context.Context,
path string,
tags map[string]string,
parentSnapshot string,
updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, error)
updater velerov1api.ProgressUpdater) (string, error)
// RunRestore which will do restore for one specific volumn with given snapshot id and return error
// updateFunc which is used for update restore progress into related pvr status
RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
updateFunc func(velerov1api.PodVolumeOperationProgress)) error
updater velerov1api.ProgressUpdater) error
// Close which will close related repository
Close(ctx context.Context)
}
//NewUploaderProvider initialize provider with specific uploader_type
func NewUploaderProvider(
ctx context.Context,
uploader_type string,
repoIdentifier string,
bsl *velerov1api.BackupStorageLocation,
credGetter *credentials.CredentialGetter,
repoKeySelector *v1.SecretKeySelector,
log logrus.FieldLogger,
) (Provider, error) {
if uploader_type == uploader.KopiaType {
return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log)
} else {
return NewKopiaUploaderProvider(ctx, credGetter, bsl, log)
}
}

View File

@@ -0,0 +1,34 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provider
import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
func NewResticUploaderProvider(
repoIdentifier string,
bsl *velerov1api.BackupStorageLocation,
credGetter *credentials.CredentialGetter,
repoKeySelector *v1.SecretKeySelector,
log logrus.FieldLogger,
) (Provider, error) {
return nil, nil //TODO
}

View File

@@ -22,10 +22,8 @@ import (
)
const (
ResticType = "restic"
KopiaType = "kopia"
VeleroBackup = "backup"
VeleroRestore = "restore"
ResticType = "restic"
KopiaType = "kopia"
)
// ValidateUploaderType validates if the input param is a valid uploader type.
@@ -42,8 +40,3 @@ type SnapshotInfo struct {
ID string `json:"id"`
Size int64 `json:"Size"`
}
type UploaderProgress struct {
TotalBytes int64 `json:"totalBytes,omitempty"`
BytesDone int64 `json:"doneBytes,omitempty"`
}