kopia pvbr smoking test changes

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2022-09-05 12:04:18 +08:00
parent 0282e65221
commit a90ba3db7c
22 changed files with 359 additions and 322 deletions

View File

@@ -0,0 +1 @@
Add changes for problems/enhancements found during smoking test for Kopia pod volume backup/restore

View File

@@ -26,7 +26,6 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
@@ -34,10 +33,10 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/repository/util"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
@@ -145,22 +144,19 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}, backupLocation); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location")
}
selector := labels.SelectorFromSet(
map[string]string{
//TODO
//velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(pvb.Spec.BackupStorageLocation),
//velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
},
)
backupRepo, err := util.GetBackupRepositoryByLabel(ctx, r.Client, pvb.Namespace, selector)
backupRepo, err := repository.GetBackupRepository(ctx, r.Client, pvb.Namespace, repository.BackupRepositoryKey{
VolumeNamespace: pvb.Spec.Pod.Namespace,
BackupLocation: pvb.Spec.BackupStorageLocation,
RepositoryType: podvolume.GetPvbRepositoryType(&pvb),
})
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "error getting backup repository")
}
var uploaderProv provider.Provider
uploaderProv, err = NewUploaderProviderFunc(ctx, r.Client, pvb.Spec.UploaderType, pvb.Spec.RepoIdentifier,
backupLocation, &backupRepo, r.CredentialGetter, repokey.RepoKeySelector(), log)
backupLocation, backupRepo, r.CredentialGetter, repokey.RepoKeySelector(), log)
if err != nil {
return r.updateStatusToFailed(ctx, &pvb, err, "error creating uploader", log)
}

View File

@@ -80,11 +80,16 @@ func buildBackupRepo() *velerov1api.BackupRepository {
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: fmt.Sprintf("%s-bsl-loc-dn24h", velerov1api.DefaultNamespace),
Name: fmt.Sprintf("%s-bsl-loc-restic-dn24h", velerov1api.DefaultNamespace),
Labels: map[string]string{
velerov1api.StorageLocationLabel: "bsl-loc",
velerov1api.VolumeNamespaceLabel: velerov1api.DefaultNamespace,
velerov1api.RepositoryTypeLabel: "restic",
},
},
Status: velerov1api.BackupRepositoryStatus{
Phase: velerov1api.BackupRepositoryPhaseReady,
},
}
}

View File

@@ -39,10 +39,9 @@ import (
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
"github.com/vmware-tanzu/velero/pkg/repository/util"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/uploader/provider"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
@@ -250,21 +249,17 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
return errors.Wrap(err, "error getting backup storage location")
}
selector := labels.SelectorFromSet(
map[string]string{
//TODO
//velerov1api.VolumeNamespaceLabel: label.GetValidName(volumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(req.Spec.BackupStorageLocation),
//velerov1api.RepositoryTypeLabel: label.GetValidName(repositoryType),
},
)
backupRepo, err := util.GetBackupRepositoryByLabel(ctx, c.Client, req.Namespace, selector)
backupRepo, err := repository.GetBackupRepository(ctx, c.Client, req.Namespace, repository.BackupRepositoryKey{
VolumeNamespace: req.Spec.Pod.Namespace,
BackupLocation: req.Spec.BackupStorageLocation,
RepositoryType: podvolume.GetPvrRepositoryType(req),
})
if err != nil {
return errors.Wrap(err, "error getting backup repository")
}
uploaderProv, err := provider.NewUploaderProvider(ctx, c.Client, req.Spec.UploaderType,
req.Spec.RepoIdentifier, backupLocation, &backupRepo, c.credentialGetter, repokey.RepoKeySelector(), log)
req.Spec.RepoIdentifier, backupLocation, backupRepo, c.credentialGetter, repokey.RepoKeySelector(), log)
if err != nil {
return errors.Wrap(err, "error creating uploader")
}

View File

@@ -18,7 +18,6 @@ package controller
import (
"context"
"strings"
"time"
"github.com/pkg/errors"
@@ -173,23 +172,10 @@ func (r *ResticRepoReconciler) getRepositoryMaintenanceFrequency(req *velerov1ap
}
}
// ensureRepo checks to see if a repository exists, and attempts to initialize it if
// it does not exist. An error is returned if the repository can't be connected to
// or initialized.
// ensureRepo calls repo manager's PrepareRepo to ensure the repo is ready for use.
// An error is returned if the repository can't be connected to or initialized.
func ensureRepo(repo *velerov1api.BackupRepository, repoManager repository.Manager) error {
if err := repoManager.ConnectToRepo(repo); err != nil {
// If the repository has not yet been initialized, the error message will always include
// the following string. This is the only scenario where we should try to initialize it.
// Other errors (e.g. "already locked") should be returned as-is since the repository
// does already exist, but it can't be connected to.
if strings.Contains(err.Error(), "Is there a repository at the following location?") {
return repoManager.InitRepo(repo)
}
return err
}
return nil
return repoManager.PrepareRepo(repo)
}
func (r *ResticRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {

View File

@@ -34,7 +34,7 @@ import (
const testMaintenanceFrequency = 10 * time.Minute
func mockResticRepoReconciler(t *testing.T, rr *velerov1api.BackupRepository, mockOn string, arg interface{}, ret interface{}) *ResticRepoReconciler {
mgr := &repomokes.RepositoryManager{}
mgr := &repomokes.Manager{}
if mockOn != "" {
mgr.On(mockOn, arg).Return(ret)
}
@@ -75,7 +75,7 @@ func TestPatchResticRepository(t *testing.T) {
func TestCheckNotReadyRepo(t *testing.T) {
rr := mockResticRepositoryCR()
reconciler := mockResticRepoReconciler(t, rr, "ConnectToRepo", rr, nil)
reconciler := mockResticRepoReconciler(t, rr, "PrepareRepo", rr, nil)
err := reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger)
@@ -107,7 +107,7 @@ func TestRunMaintenanceIfDue(t *testing.T) {
func TestInitializeRepo(t *testing.T) {
rr := mockResticRepositoryCR()
rr.Spec.BackupStorageLocation = "default"
reconciler := mockResticRepoReconciler(t, rr, "ConnectToRepo", rr, nil)
reconciler := mockResticRepoReconciler(t, rr, "PrepareRepo", rr, nil)
err := reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
locations := &velerov1api.BackupStorageLocation{
@@ -225,7 +225,7 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mgr := repomokes.RepositoryManager{}
mgr := repomokes.Manager{}
mgr.On("DefaultMaintenanceFrequency", mock.Anything).Return(test.freqReturn, test.freqError)
reconciler := NewResticRepoReconciler(
velerov1api.DefaultNamespace,

View File

@@ -40,6 +40,7 @@ func NewObjectStoreLayout(prefix string) *ObjectStoreLayout {
"restic": path.Join(prefix, "restic") + "/",
"metadata": path.Join(prefix, "metadata") + "/",
"plugins": path.Join(prefix, "plugins") + "/",
"kopia": path.Join(prefix, "kopia") + "/",
}
return &ObjectStoreLayout{

View File

@@ -212,6 +212,7 @@ func newPodVolumeRestore(restore *velerov1api.Restore, pod *corev1api.Pod, backu
SnapshotID: snapshot,
BackupStorageLocation: backupLocation,
RepoIdentifier: repoIdentifier,
UploaderType: uploaderType,
},
}
if pvc != nil {

View File

@@ -73,6 +73,16 @@ func GetVolumeBackupsForPod(podVolumeBackups []*velerov1api.PodVolumeBackup, pod
return volumes
}
// GetPvbRepositoryType returns the repositoryType according to the PVB information
func GetPvbRepositoryType(pvb *velerov1api.PodVolumeBackup) string {
return getRepositoryType(pvb.Spec.UploaderType)
}
// GetPvrRepositoryType returns the repositoryType according to the PVR information
func GetPvrRepositoryType(pvr *velerov1api.PodVolumeRestore) string {
return getRepositoryType(pvr.Spec.UploaderType)
}
// getVolumeBackupInfoForPod returns a map, of volume name -> VolumeBackupInfo,
// of the PodVolumeBackups that exist for the provided pod.
func getVolumeBackupInfoForPod(podVolumeBackups []*velerov1api.PodVolumeBackup, pod *corev1api.Pod, sourcePodNs string) map[string]volumeBackupInfo {

View File

@@ -0,0 +1,85 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repository
import (
"context"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
)
// A BackupRepositoryKey uniquely identify a backup repository
type BackupRepositoryKey struct {
VolumeNamespace string
BackupLocation string
RepositoryType string
}
var (
backupRepoNotFoundError = errors.New("backup repository not found")
)
func repoLabelsFromKey(key BackupRepositoryKey) labels.Set {
return map[string]string{
velerov1api.VolumeNamespaceLabel: label.GetValidName(key.VolumeNamespace),
velerov1api.StorageLocationLabel: label.GetValidName(key.BackupLocation),
velerov1api.RepositoryTypeLabel: label.GetValidName(key.RepositoryType),
}
}
// GetBackupRepository gets a backup repository through BackupRepositoryKey and ensure ready if required.
func GetBackupRepository(ctx context.Context, cli client.Client, namespace string, key BackupRepositoryKey, options ...bool) (*velerov1api.BackupRepository, error) {
var ensureReady = true
if len(options) > 0 {
ensureReady = options[0]
}
selector := labels.SelectorFromSet(repoLabelsFromKey(key))
backupRepoList := &velerov1api.BackupRepositoryList{}
err := cli.List(ctx, backupRepoList, &client.ListOptions{
Namespace: namespace,
LabelSelector: selector,
})
if err != nil {
return nil, errors.Wrap(err, "error getting backup repository list")
}
if len(backupRepoList.Items) == 0 {
return nil, backupRepoNotFoundError
}
if len(backupRepoList.Items) > 1 {
return nil, errors.Errorf("more than one BackupRepository found for workload namespace %q, backup storage location %q, repository type %q", key.VolumeNamespace, key.BackupLocation, key.RepositoryType)
}
repo := &backupRepoList.Items[0]
if ensureReady {
if repo.Status.Phase != velerov1api.BackupRepositoryPhaseReady {
return nil, errors.Errorf("backup repository is not ready: %s", repo.Status.Message)
}
}
return repo, nil
}

View File

@@ -55,12 +55,14 @@ type Manager interface {
// InitRepo initializes a repo with the specified name and identifier.
InitRepo(repo *velerov1api.BackupRepository) error
// ConnectToRepo runs the 'restic snapshots' command against the
// specified repo, and returns an error if it fails. This is
// intended to be used to ensure that the repo exists/can be
// authenticated to.
// ConnectToRepo tries to connect to the specified repo, and returns an error if it fails.
// This is intended to be used to ensure that the repo exists/can be authenticated to.
ConnectToRepo(repo *velerov1api.BackupRepository) error
// PrepareRepo tries to connect to the specific repo first, if it fails because of the
// repo is not initialized, it turns to initialize the repo
PrepareRepo(repo *velerov1api.BackupRepository) error
// PruneRepo deletes unused data from a repo.
PruneRepo(repo *velerov1api.BackupRepository) error
@@ -108,7 +110,7 @@ func NewManager(
mgr.providers[velerov1api.BackupRepositoryTypeKopia] = provider.NewUnifiedRepoProvider(credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, mgr.log)
}, velerov1api.BackupRepositoryTypeKopia, mgr.log)
return mgr
}
@@ -143,6 +145,21 @@ func (m *manager) ConnectToRepo(repo *velerov1api.BackupRepository) error {
return prd.ConnectToRepo(context.Background(), param)
}
func (m *manager) PrepareRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.Lock(repo.Name)
defer m.repoLocker.Unlock(repo.Name)
prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
return prd.PrepareRepo(context.Background(), param)
}
func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.LockExclusive(repo.Name)
defer m.repoLocker.UnlockExclusive(repo.Name)

View File

@@ -0,0 +1,139 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
repository "github.com/vmware-tanzu/velero/pkg/repository"
time "time"
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
// Manager is an autogenerated mock type for the Manager type
type Manager struct {
mock.Mock
}
// ConnectToRepo provides a mock function with given fields: repo
func (_m *Manager) ConnectToRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// DefaultMaintenanceFrequency provides a mock function with given fields: repo
func (_m *Manager) DefaultMaintenanceFrequency(repo *v1.BackupRepository) (time.Duration, error) {
ret := _m.Called(repo)
var r0 time.Duration
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) time.Duration); ok {
r0 = rf(repo)
} else {
r0 = ret.Get(0).(time.Duration)
}
var r1 error
if rf, ok := ret.Get(1).(func(*v1.BackupRepository) error); ok {
r1 = rf(repo)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Forget provides a mock function with given fields: _a0, _a1
func (_m *Manager) Forget(_a0 context.Context, _a1 repository.SnapshotIdentifier) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, repository.SnapshotIdentifier) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// InitRepo provides a mock function with given fields: repo
func (_m *Manager) InitRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// PrepareRepo provides a mock function with given fields: repo
func (_m *Manager) PrepareRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// PruneRepo provides a mock function with given fields: repo
func (_m *Manager) PruneRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnlockRepo provides a mock function with given fields: repo
func (_m *Manager) UnlockRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewManager interface {
mock.TestingT
Cleanup(func())
}
// NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewManager(t mockConstructorTestingTNewManager) *Manager {
mock := &Manager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -1,171 +0,0 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.*/
// Code generated by mockery v2.10.2. DO NOT EDIT.
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
time "time"
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/podvolume"
"github.com/vmware-tanzu/velero/pkg/repository"
)
// RepositoryManager is an autogenerated mock type for the RepositoryManager type
type RepositoryManager struct {
mock.Mock
}
// ConnectToRepo provides a mock function with given fields: repo
func (_m *RepositoryManager) ConnectToRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// Forget provides a mock function with given fields: _a0, _a1
func (_m *RepositoryManager) Forget(_a0 context.Context, _a1 repository.SnapshotIdentifier) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, repository.SnapshotIdentifier) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// DefaultMaintenanceFrequency provides a mock function with given fields: repo
func (_m *RepositoryManager) DefaultMaintenanceFrequency(repo *v1.BackupRepository) (time.Duration, error) {
ret := _m.Called(repo)
var r0 time.Duration
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) time.Duration); ok {
r0 = rf(repo)
} else {
r0 = ret.Get(0).(time.Duration)
}
var r1 error
if rf, ok := ret.Get(1).(func(*v1.BackupRepository) error); ok {
r1 = rf(repo)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// InitRepo provides a mock function with given fields: repo
func (_m *RepositoryManager) InitRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(repository *v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewBackupper provides a mock function with given fields: _a0, _a1
func (_m *RepositoryManager) NewBackupper(_a0 context.Context, _a1 *v1.Backup) (podvolume.Backupper, error) {
ret := _m.Called(_a0, _a1)
var r0 podvolume.Backupper
if rf, ok := ret.Get(0).(func(context.Context, *v1.Backup) podvolume.Backupper); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(podvolume.Backupper)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1.Backup) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewRestorer provides a mock function with given fields: _a0, _a1
func (_m *RepositoryManager) NewRestorer(_a0 context.Context, _a1 *v1.Restore) (podvolume.Restorer, error) {
ret := _m.Called(_a0, _a1)
var r0 podvolume.Restorer
if rf, ok := ret.Get(0).(func(context.Context, *v1.Restore) podvolume.Restorer); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(podvolume.Restorer)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v1.Restore) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// PruneRepo provides a mock function with given fields: repo
func (_m *RepositoryManager) PruneRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(repository *v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnlockRepo provides a mock function with given fields: repo
func (_m *RepositoryManager) UnlockRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
var r0 error
if rf, ok := ret.Get(0).(func(repository *v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@@ -18,6 +18,7 @@ package provider
import (
"context"
"strings"
"time"
"github.com/sirupsen/logrus"
@@ -46,10 +47,19 @@ func (r *resticRepositoryProvider) ConnectToRepo(ctx context.Context, param Repo
}
func (r *resticRepositoryProvider) PrepareRepo(ctx context.Context, param RepoParam) error {
if err := r.InitRepo(ctx, param); err != nil {
if err := r.ConnectToRepo(ctx, param); err != nil {
// If the repository has not yet been initialized, the error message will always include
// the following string. This is the only scenario where we should try to initialize it.
// Other errors (e.g. "already locked") should be returned as-is since the repository
// does already exist, but it can't be connected to.
if strings.Contains(err.Error(), "Is there a repository at the following location?") {
return r.InitRepo(ctx, param)
}
return err
}
return r.ConnectToRepo(ctx, param)
return nil
}
func (r *resticRepositoryProvider) PruneRepo(ctx context.Context, param RepoParam) error {

View File

@@ -38,6 +38,7 @@ type unifiedRepoProvider struct {
credentialGetter credentials.CredentialGetter
workPath string
repoService udmrepo.BackupRepoService
repoBackend string
log logrus.FieldLogger
}
@@ -50,7 +51,7 @@ var getS3BucketRegion = repoconfig.GetAWSBucketRegion
var getAzureStorageDomain = repoconfig.GetAzureStorageDomain
type localFuncTable struct {
getStorageVariables func(*velerov1api.BackupStorageLocation, string) (map[string]string, error)
getStorageVariables func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error)
getStorageCredentials func(*velerov1api.BackupStorageLocation, credentials.FileStore) (map[string]string, error)
}
@@ -69,10 +70,12 @@ const (
// NewUnifiedRepoProvider creates the service provider for Unified Repo
func NewUnifiedRepoProvider(
credentialGetter credentials.CredentialGetter,
repoBackend string,
log logrus.FieldLogger,
) Provider {
repo := unifiedRepoProvider{
credentialGetter: credentialGetter,
repoBackend: repoBackend,
log: log,
}
@@ -303,7 +306,7 @@ func (urp *unifiedRepoProvider) GetStoreOptions(param interface{}) (map[string]s
return map[string]string{}, errors.Errorf("invalid parameter, expect %T, actual %T", RepoParam{}, param)
}
storeVar, err := funcTable.getStorageVariables(repoParam.BackupLocation, repoParam.BackupRepo.Spec.VolumeNamespace)
storeVar, err := funcTable.getStorageVariables(repoParam.BackupLocation, urp.repoBackend, repoParam.BackupRepo.Spec.VolumeNamespace)
if err != nil {
return map[string]string{}, errors.Wrap(err, "error to get storage variables")
}
@@ -406,7 +409,7 @@ func getStorageCredentials(backupLocation *velerov1api.BackupStorageLocation, cr
return result, nil
}
func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repoName string) (map[string]string, error) {
func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repoBackend string, repoName string) (map[string]string, error) {
result := make(map[string]string)
backendType := repoconfig.GetBackendType(backupLocation.Spec.Provider)
@@ -426,7 +429,7 @@ func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repo
prefix = strings.Trim(backupLocation.Spec.ObjectStorage.Prefix, "/")
}
prefix = path.Join(prefix, udmrepo.StoreOptionPrefixName, repoName) + "/"
prefix = path.Join(prefix, repoBackend, repoName) + "/"
region := config["region"]

View File

@@ -233,6 +233,7 @@ func TestGetStorageVariables(t *testing.T) {
name string
backupLocation velerov1api.BackupStorageLocation
repoName string
repoBackend string
getS3BucketRegion func(string) (string, error)
getAzureStorageDomain func(map[string]string) string
expected map[string]string
@@ -262,9 +263,10 @@ func TestGetStorageVariables(t *testing.T) {
},
},
},
repoBackend: "fake-repo-type",
expected: map[string]string{
"bucket": "fake-bucket",
"prefix": "fake-prefix/unified-repo/",
"prefix": "fake-prefix/fake-repo-type/",
"region": "fake-region",
"fspath": "",
"endpoint": "fake-url",
@@ -286,9 +288,10 @@ func TestGetStorageVariables(t *testing.T) {
getS3BucketRegion: func(bucket string) (string, error) {
return "region from bucket: " + bucket, nil
},
repoBackend: "fake-repo-type",
expected: map[string]string{
"bucket": "fake-bucket",
"prefix": "fake-prefix/unified-repo/",
"prefix": "fake-prefix/fake-repo-type/",
"region": "region from bucket: fake-bucket",
"fspath": "",
"endpoint": "s3-region from bucket: fake-bucket.amazonaws.com",
@@ -332,9 +335,10 @@ func TestGetStorageVariables(t *testing.T) {
getS3BucketRegion: func(bucket string) (string, error) {
return "region from bucket: " + bucket, nil
},
repoBackend: "fake-repo-type",
expected: map[string]string{
"bucket": "fake-bucket-object-store",
"prefix": "fake-prefix-object-store/unified-repo/",
"prefix": "fake-prefix-object-store/fake-repo-type/",
"region": "fake-region",
"fspath": "",
"endpoint": "fake-url",
@@ -364,9 +368,10 @@ func TestGetStorageVariables(t *testing.T) {
getAzureStorageDomain: func(config map[string]string) string {
return config["storageDomain"]
},
repoBackend: "fake-repo-type",
expected: map[string]string{
"bucket": "fake-bucket-object-store",
"prefix": "fake-prefix-object-store/unified-repo/",
"prefix": "fake-prefix-object-store/fake-repo-type/",
"region": "fake-region",
"fspath": "",
"storageDomain": "fake-domain",
@@ -386,13 +391,14 @@ func TestGetStorageVariables(t *testing.T) {
},
},
},
repoName: "//fake-name//",
repoName: "//fake-name//",
repoBackend: "fake-repo-type",
getAzureStorageDomain: func(config map[string]string) string {
return config["storageDomain"]
},
expected: map[string]string{
"bucket": "fake-bucket",
"prefix": "fake-prefix/unified-repo/fake-name/",
"prefix": "fake-prefix/fake-repo-type/fake-name/",
"region": "fake-region",
"fspath": "",
"storageDomain": "fake-domain",
@@ -409,10 +415,11 @@ func TestGetStorageVariables(t *testing.T) {
},
},
},
repoBackend: "fake-repo-type",
expected: map[string]string{
"fspath": "fake-path",
"bucket": "",
"prefix": "fake-prefix/unified-repo/",
"prefix": "fake-prefix/fake-repo-type/",
"region": "",
},
},
@@ -423,7 +430,7 @@ func TestGetStorageVariables(t *testing.T) {
getS3BucketRegion = tc.getS3BucketRegion
getAzureStorageDomain = tc.getAzureStorageDomain
actual, err := getStorageVariables(&tc.backupLocation, tc.repoName)
actual, err := getStorageVariables(&tc.backupLocation, tc.repoBackend, tc.repoName)
require.Equal(t, tc.expected, actual)
@@ -512,7 +519,7 @@ func TestGetStoreOptions(t *testing.T) {
BackupRepo: &velerov1api.BackupRepository{},
},
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, errors.New("fake-error-2")
},
},
@@ -526,7 +533,7 @@ func TestGetStoreOptions(t *testing.T) {
BackupRepo: &velerov1api.BackupRepository{},
},
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -586,7 +593,7 @@ func TestPrepareRepo(t *testing.T) {
repoService: new(reposervicenmocks.BackupRepoService),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, errors.New("fake-store-option-error")
},
},
@@ -597,7 +604,7 @@ func TestPrepareRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -618,7 +625,7 @@ func TestPrepareRepo(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -695,7 +702,7 @@ func TestForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {
@@ -719,7 +726,7 @@ func TestForget(t *testing.T) {
getter: new(credmock.SecretStore),
credStoreReturn: "fake-password",
funcTable: localFuncTable{
getStorageVariables: func(*velerov1api.BackupStorageLocation, string) (map[string]string, error) {
getStorageVariables: func(*velerov1api.BackupStorageLocation, string, string) (map[string]string, error) {
return map[string]string{}, nil
},
getStorageCredentials: func(*velerov1api.BackupStorageLocation, velerocredentials.FileStore) (map[string]string, error) {

View File

@@ -19,6 +19,7 @@ package kopialib
import (
"context"
"os"
"runtime"
"strings"
"sync/atomic"
"time"
@@ -382,12 +383,7 @@ func (kor *kopiaObjectReader) Read(p []byte) (int, error) {
return 0, errors.New("object reader is closed or not open")
}
n, err := kor.rawReader.Read(p)
if err != nil {
return 0, errors.Wrap(err, "error to read object")
}
return n, nil
return kor.rawReader.Read(p)
}
func (kor *kopiaObjectReader) Seek(offset int64, whence int) (int64, error) {
@@ -395,12 +391,7 @@ func (kor *kopiaObjectReader) Seek(offset int64, whence int) (int64, error) {
return -1, errors.New("object reader is closed or not open")
}
p, err := kor.rawReader.Seek(offset, whence)
if err != nil {
return -1, errors.Wrap(err, "error to seek object")
}
return p, nil
return kor.rawReader.Seek(offset, whence)
}
func (kor *kopiaObjectReader) Close() error {
@@ -410,7 +401,7 @@ func (kor *kopiaObjectReader) Close() error {
err := kor.rawReader.Close()
if err != nil {
return errors.Wrap(err, "error to close object reader")
return err
}
kor.rawReader = nil
@@ -431,12 +422,7 @@ func (kow *kopiaObjectWriter) Write(p []byte) (int, error) {
return 0, errors.New("object writer is closed or not open")
}
n, err := kow.rawWriter.Write(p)
if err != nil {
return 0, errors.Wrap(err, "error to write object")
}
return n, nil
return kow.rawWriter.Write(p)
}
func (kow *kopiaObjectWriter) Seek(offset int64, whence int) (int64, error) {
@@ -476,7 +462,7 @@ func (kow *kopiaObjectWriter) Close() error {
err := kow.rawWriter.Close()
if err != nil {
return errors.Wrap(err, "error to close object writer")
return err
}
kow.rawWriter = nil
@@ -484,9 +470,9 @@ func (kow *kopiaObjectWriter) Close() error {
return nil
}
// getAsyncWrites returns the number of async writes, at present, we don't support async writes
// getAsyncWrites returns the number of concurrent async writes
func getAsyncWrites() int {
return 0
return runtime.NumCPU()
}
// getCompressorForObject returns the compressor for an object, at present, we don't support compression

View File

@@ -1,45 +0,0 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
// GetBackupRepositoryByLabel which find backup repository through pvbNamespace, label
// name of BackupRepository is generated with prefix volumeNamespace-backupLocation- and end with random characters
// it could not retrieve the BackupRepository CR with namespace + name. so first list all CRs with in the pvbNamespace
// then filtering the matched CR by label
func GetBackupRepositoryByLabel(ctx context.Context, cli client.Client, pvbNamespace string, selector labels.Selector) (velerov1api.BackupRepository, error) {
backupRepoList := &velerov1api.BackupRepositoryList{}
if err := cli.List(ctx, backupRepoList, &client.ListOptions{
Namespace: pvbNamespace,
LabelSelector: selector,
}); err != nil {
return velerov1api.BackupRepository{}, errors.Wrap(err, "error getting backup repository list")
} else if len(backupRepoList.Items) == 1 {
return backupRepoList.Items[0], nil
} else {
return velerov1api.BackupRepository{}, errors.Errorf("unexpectedly find %d BackupRepository for workload namespace %s with label selector %v", len(backupRepoList.Items), pvbNamespace, selector)
}
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/logging"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
@@ -110,7 +111,9 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}
snapID, snapshotSize, err := SnapshotSource(ctx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
kopiaCtx := logging.SetupKopiaLog(ctx, log)
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader")
if err != nil {
return nil, false, err
}
@@ -261,7 +264,9 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
log.Info("Start to restore...")
rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(ctx, rep, snapshotID, false)
kopiaCtx := logging.SetupKopiaLog(ctx, log)
rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(kopiaCtx, rep, snapshotID, false)
if err != nil {
return 0, 0, errors.Wrapf(err, "Unable to get filesystem entry for snapshot %v", snapshotID)
}
@@ -279,7 +284,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *KopiaProg
IgnorePermissionErrors: true,
}
stat, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
stat, err := restore.Entry(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
RestoreDirEntryAtDepth: math.MaxInt32,
Cancel: cancleCh,

View File

@@ -73,7 +73,10 @@ func NewUploaderProvider(
return nil, errors.New("uninitialized FileStore credentail is not supported")
}
if uploaderType == uploader.KopiaType {
if err := provider.NewUnifiedRepoProvider(*credGetter, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
// We use the hardcode repositoryType velerov1api.BackupRepositoryTypeKopia for now, because we have only one implementation of unified repo.
// TODO: post v1.10, replace the hardcode. In future, when we have multiple implementations of Unified Repo (besides Kopia), we will add the
// repositoryType to BSL, because by then, we are not able to hardcode the repositoryType to BackupRepositoryTypeKopia for Unified Repo.
if err := provider.NewUnifiedRepoProvider(*credGetter, velerov1api.BackupRepositoryTypeKopia, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil {
return nil, errors.Wrap(err, "failed to connect repository")
}
return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log)

View File

@@ -120,7 +120,10 @@ func (rp *resticProvider) RunBackup(
backupCmd := ResticBackupCMDFunc(rp.repoIdentifier, rp.credentialsFile, path, tags)
backupCmd.Env = rp.cmdEnv
backupCmd.CACertFile = rp.caCertFile
backupCmd.ExtraFlags = rp.extraFlags
if len(rp.extraFlags) != 0 {
backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, rp.extraFlags...)
}
if parentSnapshot != "" {
backupCmd.ExtraFlags = append(backupCmd.ExtraFlags, fmt.Sprintf("--parent=%s", parentSnapshot))
}

View File

@@ -40,22 +40,22 @@ func SetupKopiaLog(ctx context.Context, logger logrus.FieldLogger) context.Conte
}
func (kl *kopiaLog) Debugf(msg string, args ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger := kl.logger.WithField("logModule", kl.getLogModule())
logger.Debugf(msg, args...)
}
func (kl *kopiaLog) Debugw(msg string, keyValuePairs ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger := kl.logger.WithField("logModule", kl.getLogModule())
logger.WithFields(getLogFields(keyValuePairs...)).Debug(msg)
}
func (kl *kopiaLog) Infof(msg string, args ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger := kl.logger.WithField("logModule", kl.getLogModule())
logger.Infof(msg, args...)
}
func (kl *kopiaLog) Warnf(msg string, args ...interface{}) {
logger := kl.logger.WithField("logSource", kl.getLogSource())
logger := kl.logger.WithField("logModule", kl.getLogModule())
logger.Warnf(msg, args...)
}
@@ -64,14 +64,14 @@ func (kl *kopiaLog) Warnf(msg string, args ...interface{}) {
// affect Velero's workflow.
func (kl *kopiaLog) Errorf(msg string, args ...interface{}) {
logger := kl.logger.WithFields(logrus.Fields{
"logSource": kl.getLogSource(),
"logModule": kl.getLogModule(),
"sublevel": "error",
})
logger.Warnf(msg, args...)
}
func (kl *kopiaLog) getLogSource() string {
func (kl *kopiaLog) getLogModule() string {
return "kopia/" + kl.module
}