mirror of
https://github.com/vmware-tanzu/velero.git
synced 2025-12-23 14:25:22 +00:00
fix backup repo init error
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
1
changelogs/unreleased/9407-Lyndon-Li
Normal file
1
changelogs/unreleased/9407-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
Fix issue #9400, connect repo first time after creation so that init params could be written
|
||||
@@ -189,7 +189,7 @@ func (urp *unifiedRepoProvider) PrepareRepo(ctx context.Context, param RepoParam
|
||||
"repo UID": param.BackupRepo.UID,
|
||||
})
|
||||
|
||||
log.Debug("Start to prepare repo")
|
||||
log.Info("Start to prepare repo")
|
||||
|
||||
repoOption, err := udmrepo.NewRepoOptions(
|
||||
udmrepo.WithPassword(urp, param),
|
||||
@@ -211,7 +211,7 @@ func (urp *unifiedRepoProvider) PrepareRepo(ctx context.Context, param RepoParam
|
||||
if created, err := urp.repoService.IsCreated(ctx, *repoOption); err != nil {
|
||||
return errors.Wrap(err, "error to check backup repo")
|
||||
} else if created {
|
||||
log.Debug("Repo has already been initialized remotely")
|
||||
log.Info("Repo has already been initialized")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -224,7 +224,7 @@ func (urp *unifiedRepoProvider) PrepareRepo(ctx context.Context, param RepoParam
|
||||
return errors.Wrap(err, "error to create backup repo")
|
||||
}
|
||||
|
||||
log.Debug("Prepare repo complete")
|
||||
log.Info("Prepare repo complete")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -18,12 +18,15 @@ package backend
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
var ErrStoreNotExist = errors.New("store does not exist")
|
||||
|
||||
// Store defines the methods for Kopia to establish a connection to
|
||||
// the backend storage
|
||||
type Store interface {
|
||||
|
||||
@@ -92,3 +92,10 @@ func SetupConnectOptions(ctx context.Context, repoOptions udmrepo.RepoOptions) r
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func RepoOwnerFromRepoOptions(repoOptions udmrepo.RepoOptions) string {
|
||||
hostname := optionalHaveStringWithDefault(udmrepo.GenOptionOwnerDomain, repoOptions.GeneralOptions, udmrepo.GetRepoDomain())
|
||||
username := optionalHaveStringWithDefault(udmrepo.GenOptionOwnerName, repoOptions.GeneralOptions, udmrepo.GetRepoUser())
|
||||
|
||||
return username + "@" + hostname
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ package backend
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -62,6 +63,13 @@ func (c *FsBackend) Connect(ctx context.Context, isCreate bool, logger logrus.Fi
|
||||
if !filepath.IsAbs(c.options.Path) {
|
||||
return nil, errors.Errorf("filesystem repository path is not absolute, path: %s", c.options.Path)
|
||||
}
|
||||
|
||||
if !isCreate {
|
||||
if _, err := os.Stat(c.options.Path); err != nil {
|
||||
return nil, ErrStoreNotExist
|
||||
}
|
||||
}
|
||||
|
||||
ctx = logging.WithLogger(ctx, logger)
|
||||
|
||||
return filesystem.New(ctx, &c.options, isCreate)
|
||||
|
||||
@@ -98,11 +98,26 @@ func NewKopiaRepoService(logger logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
func (ks *kopiaRepoService) Create(ctx context.Context, repoOption udmrepo.RepoOptions) error {
|
||||
repoCtx := kopia.SetupKopiaLog(ctx, ks.logger)
|
||||
|
||||
if err := CreateBackupRepo(repoCtx, repoOption, ks.logger); err != nil {
|
||||
return err
|
||||
status, err := GetRepositoryStatus(ctx, repoOption, ks.logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting repo status")
|
||||
}
|
||||
|
||||
return writeInitParameters(repoCtx, repoOption, ks.logger)
|
||||
if status != RepoStatusSystemNotCreated && status != RepoStatusNotInitialized {
|
||||
return errors.Errorf("unexpected repo status %v", status)
|
||||
}
|
||||
|
||||
if status == RepoStatusSystemNotCreated {
|
||||
if err := CreateBackupRepo(repoCtx, repoOption, ks.logger); err != nil {
|
||||
return errors.Wrap(err, "error creating backup repo")
|
||||
}
|
||||
}
|
||||
|
||||
if err := InitializeBackupRepo(ctx, repoOption, ks.logger); err != nil {
|
||||
return errors.Wrap(err, "error initializing backup repo")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ks *kopiaRepoService) Connect(ctx context.Context, repoOption udmrepo.RepoOptions) error {
|
||||
@@ -114,7 +129,17 @@ func (ks *kopiaRepoService) Connect(ctx context.Context, repoOption udmrepo.Repo
|
||||
func (ks *kopiaRepoService) IsCreated(ctx context.Context, repoOption udmrepo.RepoOptions) (bool, error) {
|
||||
repoCtx := kopia.SetupKopiaLog(ctx, ks.logger)
|
||||
|
||||
return IsBackupRepoCreated(repoCtx, repoOption, ks.logger)
|
||||
status, err := GetRepositoryStatus(repoCtx, repoOption, ks.logger)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if status != RepoStatusCreated {
|
||||
ks.logger.Infof("Repo is not fully created, status %v", status)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (ks *kopiaRepoService) Open(ctx context.Context, repoOption udmrepo.RepoOptions) (udmrepo.BackupRepo, error) {
|
||||
@@ -612,73 +637,3 @@ func openKopiaRepo(ctx context.Context, configFile string, password string, _ *o
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func writeInitParameters(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) error {
|
||||
r, err := openKopiaRepo(ctx, repoOption.ConfigFilePath, repoOption.RepoPassword, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
c := r.Close(ctx)
|
||||
if c != nil {
|
||||
logger.WithError(c).Error("Failed to close repo")
|
||||
}
|
||||
}()
|
||||
|
||||
err = repo.WriteSession(ctx, r, repo.WriteSessionOptions{
|
||||
Purpose: "set init parameters",
|
||||
}, func(ctx context.Context, w repo.RepositoryWriter) error {
|
||||
p := maintenance.DefaultParams()
|
||||
|
||||
if overwriteFullMaintainInterval != time.Duration(0) {
|
||||
logger.Infof("Full maintenance interval change from %v to %v", p.FullCycle.Interval, overwriteFullMaintainInterval)
|
||||
p.FullCycle.Interval = overwriteFullMaintainInterval
|
||||
}
|
||||
|
||||
if overwriteQuickMaintainInterval != time.Duration(0) {
|
||||
logger.Infof("Quick maintenance interval change from %v to %v", p.QuickCycle.Interval, overwriteQuickMaintainInterval)
|
||||
p.QuickCycle.Interval = overwriteQuickMaintainInterval
|
||||
}
|
||||
// the repoOption.StorageOptions are set via
|
||||
// udmrepo.WithStoreOptions -> udmrepo.GetStoreOptions (interface)
|
||||
// -> pkg/repository/provider.GetStoreOptions(param interface{}) -> pkg/repository/provider.getStorageVariables(..., backupRepoConfig)
|
||||
// where backupRepoConfig comes from param.(RepoParam).BackupRepo.Spec.RepositoryConfig map[string]string
|
||||
// where RepositoryConfig comes from pkg/controller/getBackupRepositoryConfig(...)
|
||||
// where it gets a configMap name from pkg/cmd/server/config/Config.BackupRepoConfig
|
||||
// which gets set via velero server flag `backup-repository-configmap` "The name of ConfigMap containing backup repository configurations."
|
||||
// and data stored as json under ConfigMap.Data[repoType] where repoType is BackupRepository.Spec.RepositoryType: either kopia or restic
|
||||
// repoOption.StorageOptions[udmrepo.StoreOptionKeyFullMaintenanceInterval] would for example look like
|
||||
// configMapName.data.kopia: {"fullMaintenanceInterval": "eagerGC"}
|
||||
fullMaintIntervalOption := udmrepo.FullMaintenanceIntervalOptions(repoOption.StorageOptions[udmrepo.StoreOptionKeyFullMaintenanceInterval])
|
||||
priorMaintInterval := p.FullCycle.Interval
|
||||
switch fullMaintIntervalOption {
|
||||
case udmrepo.FastGC:
|
||||
p.FullCycle.Interval = udmrepo.FastGCInterval
|
||||
case udmrepo.EagerGC:
|
||||
p.FullCycle.Interval = udmrepo.EagerGCInterval
|
||||
case udmrepo.NormalGC:
|
||||
p.FullCycle.Interval = udmrepo.NormalGCInterval
|
||||
case "": // do nothing
|
||||
default:
|
||||
return errors.Errorf("invalid full maintenance interval option %s", fullMaintIntervalOption)
|
||||
}
|
||||
if priorMaintInterval != p.FullCycle.Interval {
|
||||
logger.Infof("Full maintenance interval change from %v to %v", priorMaintInterval, p.FullCycle.Interval)
|
||||
}
|
||||
|
||||
p.Owner = r.ClientOptions().UsernameAtHost()
|
||||
|
||||
if err := maintenance.SetParams(ctx, w, &p); err != nil {
|
||||
return errors.Wrap(err, "error to set maintenance params")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error to init write repo parameters")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/maintenance"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/repo/object"
|
||||
"github.com/pkg/errors"
|
||||
@@ -264,177 +263,6 @@ func TestMaintain(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteInitParameters(t *testing.T) {
|
||||
var directRpo *repomocks.DirectRepository
|
||||
assertFullMaintIntervalEqual := func(expected, actual *maintenance.Params) bool {
|
||||
return assert.Equal(t, expected.FullCycle.Interval, actual.FullCycle.Interval)
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
repoOptions udmrepo.RepoOptions
|
||||
returnRepo *repomocks.DirectRepository
|
||||
returnRepoWriter *repomocks.DirectRepositoryWriter
|
||||
repoOpen func(context.Context, string, string, *repo.Options) (repo.Repository, error)
|
||||
newRepoWriterError error
|
||||
replaceManifestError error
|
||||
// expected replacemanifest params to be received by maintenance.SetParams, and therefore writeInitParameters
|
||||
expectedReplaceManifestsParams *maintenance.Params
|
||||
// allows for asserting only certain fields are set as expected
|
||||
assertReplaceManifestsParams func(*maintenance.Params, *maintenance.Params) bool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "repo open fail, repo not exist",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return nil, os.ErrNotExist
|
||||
},
|
||||
expectedErr: "error to open repo, repo doesn't exist: file does not exist",
|
||||
},
|
||||
{
|
||||
name: "repo open fail, other error",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return nil, errors.New("fake-repo-open-error")
|
||||
},
|
||||
expectedErr: "error to open repo: fake-repo-open-error",
|
||||
},
|
||||
{
|
||||
name: "write session fail",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
newRepoWriterError: errors.New("fake-new-writer-error"),
|
||||
expectedErr: "error to init write repo parameters: unable to create writer: fake-new-writer-error",
|
||||
},
|
||||
{
|
||||
name: "set repo param fail",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
replaceManifestError: errors.New("fake-replace-manifest-error"),
|
||||
expectedErr: "error to init write repo parameters: error to set maintenance params: put manifest: fake-replace-manifest-error",
|
||||
},
|
||||
{
|
||||
name: "repo with maintenance interval has expected params",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
StorageOptions: map[string]string{
|
||||
udmrepo.StoreOptionKeyFullMaintenanceInterval: string(udmrepo.FastGC),
|
||||
},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedReplaceManifestsParams: &maintenance.Params{
|
||||
FullCycle: maintenance.CycleParams{
|
||||
Interval: udmrepo.FastGCInterval,
|
||||
},
|
||||
},
|
||||
assertReplaceManifestsParams: assertFullMaintIntervalEqual,
|
||||
},
|
||||
{
|
||||
name: "repo with empty maintenance interval has expected params",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
StorageOptions: map[string]string{
|
||||
udmrepo.StoreOptionKeyFullMaintenanceInterval: string(""),
|
||||
},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedReplaceManifestsParams: &maintenance.Params{
|
||||
FullCycle: maintenance.CycleParams{
|
||||
Interval: udmrepo.NormalGCInterval,
|
||||
},
|
||||
},
|
||||
assertReplaceManifestsParams: assertFullMaintIntervalEqual,
|
||||
},
|
||||
{
|
||||
name: "repo with invalid maintenance interval has expected errors",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
StorageOptions: map[string]string{
|
||||
udmrepo.StoreOptionKeyFullMaintenanceInterval: string("foo"),
|
||||
},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedErr: "error to init write repo parameters: invalid full maintenance interval option foo",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
logger := velerotest.NewLogger()
|
||||
ctx := t.Context()
|
||||
|
||||
if tc.repoOpen != nil {
|
||||
kopiaRepoOpen = tc.repoOpen
|
||||
}
|
||||
|
||||
if tc.returnRepo != nil {
|
||||
directRpo = tc.returnRepo
|
||||
}
|
||||
|
||||
if tc.returnRepo != nil {
|
||||
tc.returnRepo.On("NewWriter", mock.Anything, mock.Anything).Return(ctx, tc.returnRepoWriter, tc.newRepoWriterError)
|
||||
tc.returnRepo.On("ClientOptions").Return(repo.ClientOptions{})
|
||||
tc.returnRepo.On("Close", mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
if tc.returnRepoWriter != nil {
|
||||
tc.returnRepoWriter.On("Close", mock.Anything).Return(nil)
|
||||
if tc.replaceManifestError != nil {
|
||||
tc.returnRepoWriter.On("ReplaceManifests", mock.Anything, mock.Anything, mock.Anything).Return(manifest.ID(""), tc.replaceManifestError)
|
||||
}
|
||||
if tc.expectedReplaceManifestsParams != nil {
|
||||
tc.returnRepoWriter.On("ReplaceManifests", mock.Anything, mock.AnythingOfType("map[string]string"), mock.AnythingOfType("*maintenance.Params")).Return(manifest.ID(""), nil)
|
||||
tc.returnRepoWriter.On("Flush", mock.Anything).Return(nil)
|
||||
}
|
||||
}
|
||||
|
||||
err := writeInitParameters(ctx, tc.repoOptions, logger)
|
||||
|
||||
if tc.expectedErr == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.EqualError(t, err, tc.expectedErr)
|
||||
}
|
||||
if tc.expectedReplaceManifestsParams != nil {
|
||||
actualReplaceManifestsParams, converted := tc.returnRepoWriter.Calls[0].Arguments.Get(2).(*maintenance.Params)
|
||||
assert.True(t, converted)
|
||||
tc.assertReplaceManifestsParams(tc.expectedReplaceManifestsParams, actualReplaceManifestsParams)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldLog(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
||||
@@ -18,13 +18,18 @@ package kopialib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
"github.com/kopia/kopia/repo/maintenance"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
@@ -45,6 +50,22 @@ var backendStores = []kopiaBackendStore{
|
||||
{udmrepo.StorageTypeS3, "an S3 bucket", &backend.S3Backend{}},
|
||||
}
|
||||
|
||||
const udmRepoBlobID = "udmrepo.Repository"
|
||||
|
||||
type udmRepoMetadata struct {
|
||||
UniqueID []byte `json:"uniqueID"`
|
||||
}
|
||||
|
||||
type RepoStatus int
|
||||
|
||||
const (
|
||||
RepoStatusUnknown = 0
|
||||
RepoStatusCorrupted = 1
|
||||
RepoStatusSystemNotCreated = 2
|
||||
RepoStatusNotInitialized = 3
|
||||
RepoStatusCreated = 4
|
||||
)
|
||||
|
||||
// CreateBackupRepo creates a Kopia repository and then connect to it.
|
||||
// The storage must be empty, otherwise, it will fail
|
||||
func CreateBackupRepo(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) error {
|
||||
@@ -73,14 +94,9 @@ func ConnectBackupRepo(ctx context.Context, repoOption udmrepo.RepoOptions, logg
|
||||
return errors.New("invalid config file path")
|
||||
}
|
||||
|
||||
backendStore, err := setupBackendStore(ctx, repoOption.StorageType, repoOption.StorageOptions, logger)
|
||||
st, err := connectStore(ctx, repoOption, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error to setup backend storage")
|
||||
}
|
||||
|
||||
st, err := backendStore.store.Connect(ctx, false, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error to connect to storage")
|
||||
return err
|
||||
}
|
||||
|
||||
err = connectWithStorage(ctx, st, repoOption)
|
||||
@@ -91,32 +107,119 @@ func ConnectBackupRepo(ctx context.Context, repoOption udmrepo.RepoOptions, logg
|
||||
return nil
|
||||
}
|
||||
|
||||
func IsBackupRepoCreated(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) (bool, error) {
|
||||
backendStore, err := setupBackendStore(ctx, repoOption.StorageType, repoOption.StorageOptions, logger)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "error to setup backend storage")
|
||||
}
|
||||
|
||||
st, err := backendStore.store.Connect(ctx, false, logger)
|
||||
if err != nil {
|
||||
return false, errors.Wrap(err, "error to connect to storage")
|
||||
func GetRepositoryStatus(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) (RepoStatus, error) {
|
||||
st, err := connectStore(ctx, repoOption, logger)
|
||||
if errors.Is(err, backend.ErrStoreNotExist) {
|
||||
return RepoStatusSystemNotCreated, nil
|
||||
} else if err != nil {
|
||||
return RepoStatusUnknown, err
|
||||
}
|
||||
|
||||
var formatBytes byteBuffer
|
||||
if err := st.GetBlob(ctx, format.KopiaRepositoryBlobID, 0, -1, &formatBytes); err != nil {
|
||||
if errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return false, nil
|
||||
logger.Debug("Kopia repository blob is not found")
|
||||
return RepoStatusSystemNotCreated, nil
|
||||
}
|
||||
|
||||
return false, errors.Wrap(err, "error to read format blob")
|
||||
return RepoStatusUnknown, errors.Wrap(err, "error reading format blob")
|
||||
}
|
||||
|
||||
_, err = format.ParseKopiaRepositoryJSON(formatBytes.buffer)
|
||||
repoFmt, err := format.ParseKopiaRepositoryJSON(formatBytes.buffer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return RepoStatusCorrupted, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
var initInfoBytes byteBuffer
|
||||
if err := st.GetBlob(ctx, udmRepoBlobID, 0, -1, &initInfoBytes); err != nil {
|
||||
if errors.Is(err, blob.ErrBlobNotFound) {
|
||||
logger.Debug("Udm repo metadata blob is not found")
|
||||
return RepoStatusNotInitialized, nil
|
||||
}
|
||||
|
||||
return RepoStatusUnknown, errors.Wrap(err, "error reading udm repo blob")
|
||||
}
|
||||
|
||||
udmpRepo := &udmRepoMetadata{}
|
||||
if err := json.Unmarshal(initInfoBytes.buffer, udmpRepo); err != nil {
|
||||
return RepoStatusCorrupted, errors.Wrap(err, "invalid udm repo blob")
|
||||
}
|
||||
|
||||
if !slices.Equal(udmpRepo.UniqueID, repoFmt.UniqueID) {
|
||||
return RepoStatusCorrupted, errors.Errorf("unique ID doesn't match: %v(%v)", udmpRepo.UniqueID, repoFmt.UniqueID)
|
||||
}
|
||||
|
||||
return RepoStatusCreated, nil
|
||||
}
|
||||
|
||||
func InitializeBackupRepo(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) error {
|
||||
if repoOption.ConfigFilePath == "" {
|
||||
return errors.New("invalid config file path")
|
||||
}
|
||||
|
||||
st, err := connectStore(ctx, repoOption, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = connectWithStorage(ctx, st, repoOption)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error connecting repo with storage")
|
||||
}
|
||||
|
||||
err = writeInitParameters(ctx, repoOption, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing init parameters")
|
||||
}
|
||||
|
||||
err = writeUdmRepoMetadata(ctx, st)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing udm repo metadata")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeUdmRepoMetadata(ctx context.Context, st blob.Storage) error {
|
||||
var formatBytes byteBuffer
|
||||
if err := st.GetBlob(ctx, format.KopiaRepositoryBlobID, 0, -1, &formatBytes); err != nil {
|
||||
return errors.Wrap(err, "error reading format blob")
|
||||
}
|
||||
|
||||
repoFmt, err := format.ParseKopiaRepositoryJSON(formatBytes.buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
udmpRepo := &udmRepoMetadata{
|
||||
UniqueID: repoFmt.UniqueID,
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(udmpRepo)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error marshaling udm repo metadata")
|
||||
}
|
||||
|
||||
err = st.PutBlob(ctx, udmRepoBlobID, &byteBuffer{bytes}, blob.PutOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error writing udm repo metadata")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func connectStore(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) (blob.Storage, error) {
|
||||
backendStore, err := setupBackendStore(ctx, repoOption.StorageType, repoOption.StorageOptions, logger)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error to setup backend storage")
|
||||
}
|
||||
|
||||
st, err := backendStore.store.Connect(ctx, false, logger)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error to connect to storage")
|
||||
}
|
||||
|
||||
return st, nil
|
||||
}
|
||||
|
||||
func findBackendStore(storage string) *kopiaBackendStore {
|
||||
@@ -185,11 +288,21 @@ type byteBuffer struct {
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
func (b *byteBuffer) Write(p []byte) (n int, err error) {
|
||||
type byteBufferReader struct {
|
||||
buffer []byte
|
||||
pos int
|
||||
}
|
||||
|
||||
func (b *byteBuffer) Write(p []byte) (int, error) {
|
||||
b.buffer = append(b.buffer, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (b *byteBuffer) WriteTo(w io.Writer) (int64, error) {
|
||||
n, err := w.Write(b.buffer)
|
||||
return int64(n), err
|
||||
}
|
||||
|
||||
func (b *byteBuffer) Reset() {
|
||||
b.buffer = nil
|
||||
}
|
||||
@@ -197,3 +310,129 @@ func (b *byteBuffer) Reset() {
|
||||
func (b *byteBuffer) Length() int {
|
||||
return len(b.buffer)
|
||||
}
|
||||
|
||||
func (b *byteBuffer) Reader() io.ReadSeekCloser {
|
||||
return &byteBufferReader{buffer: b.buffer}
|
||||
}
|
||||
|
||||
func (b *byteBufferReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *byteBufferReader) Read(out []byte) (int, error) {
|
||||
if b.pos == len(b.buffer) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
copied := copy(out, b.buffer[b.pos:])
|
||||
b.pos += copied
|
||||
|
||||
return copied, nil
|
||||
}
|
||||
|
||||
func (b *byteBufferReader) Seek(offset int64, whence int) (int64, error) {
|
||||
newOffset := b.pos
|
||||
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
newOffset = int(offset)
|
||||
case io.SeekCurrent:
|
||||
newOffset += int(offset)
|
||||
case io.SeekEnd:
|
||||
newOffset = len(b.buffer) + int(offset)
|
||||
}
|
||||
|
||||
if newOffset < 0 || newOffset > len(b.buffer) {
|
||||
return -1, errors.New("invalid seek")
|
||||
}
|
||||
|
||||
b.pos = newOffset
|
||||
|
||||
return int64(newOffset), nil
|
||||
}
|
||||
|
||||
var funcGetParam = maintenance.GetParams
|
||||
|
||||
func writeInitParameters(ctx context.Context, repoOption udmrepo.RepoOptions, logger logrus.FieldLogger) error {
|
||||
r, err := openKopiaRepo(ctx, repoOption.ConfigFilePath, repoOption.RepoPassword, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
c := r.Close(ctx)
|
||||
if c != nil {
|
||||
logger.WithError(c).Error("Failed to close repo")
|
||||
}
|
||||
}()
|
||||
|
||||
params, err := funcGetParam(ctx, r)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting existing maintenance params")
|
||||
}
|
||||
|
||||
if params.Owner == backend.RepoOwnerFromRepoOptions(repoOption) {
|
||||
logger.Warn("Init parameters already exists, skip")
|
||||
return nil
|
||||
}
|
||||
|
||||
if params.Owner != "" {
|
||||
logger.Warnf("Overwriting existing init params %v", params)
|
||||
}
|
||||
|
||||
err = repo.WriteSession(ctx, r, repo.WriteSessionOptions{
|
||||
Purpose: "set init parameters",
|
||||
}, func(ctx context.Context, w repo.RepositoryWriter) error {
|
||||
p := maintenance.DefaultParams()
|
||||
|
||||
if overwriteFullMaintainInterval != time.Duration(0) {
|
||||
logger.Infof("Full maintenance interval change from %v to %v", p.FullCycle.Interval, overwriteFullMaintainInterval)
|
||||
p.FullCycle.Interval = overwriteFullMaintainInterval
|
||||
}
|
||||
|
||||
if overwriteQuickMaintainInterval != time.Duration(0) {
|
||||
logger.Infof("Quick maintenance interval change from %v to %v", p.QuickCycle.Interval, overwriteQuickMaintainInterval)
|
||||
p.QuickCycle.Interval = overwriteQuickMaintainInterval
|
||||
}
|
||||
// the repoOption.StorageOptions are set via
|
||||
// udmrepo.WithStoreOptions -> udmrepo.GetStoreOptions (interface)
|
||||
// -> pkg/repository/provider.GetStoreOptions(param interface{}) -> pkg/repository/provider.getStorageVariables(..., backupRepoConfig)
|
||||
// where backupRepoConfig comes from param.(RepoParam).BackupRepo.Spec.RepositoryConfig map[string]string
|
||||
// where RepositoryConfig comes from pkg/controller/getBackupRepositoryConfig(...)
|
||||
// where it gets a configMap name from pkg/cmd/server/config/Config.BackupRepoConfig
|
||||
// which gets set via velero server flag `backup-repository-configmap` "The name of ConfigMap containing backup repository configurations."
|
||||
// and data stored as json under ConfigMap.Data[repoType] where repoType is BackupRepository.Spec.RepositoryType: either kopia or restic
|
||||
// repoOption.StorageOptions[udmrepo.StoreOptionKeyFullMaintenanceInterval] would for example look like
|
||||
// configMapName.data.kopia: {"fullMaintenanceInterval": "eagerGC"}
|
||||
fullMaintIntervalOption := udmrepo.FullMaintenanceIntervalOptions(repoOption.StorageOptions[udmrepo.StoreOptionKeyFullMaintenanceInterval])
|
||||
priorMaintInterval := p.FullCycle.Interval
|
||||
switch fullMaintIntervalOption {
|
||||
case udmrepo.FastGC:
|
||||
p.FullCycle.Interval = udmrepo.FastGCInterval
|
||||
case udmrepo.EagerGC:
|
||||
p.FullCycle.Interval = udmrepo.EagerGCInterval
|
||||
case udmrepo.NormalGC:
|
||||
p.FullCycle.Interval = udmrepo.NormalGCInterval
|
||||
case "": // do nothing
|
||||
default:
|
||||
return errors.Errorf("invalid full maintenance interval option %s", fullMaintIntervalOption)
|
||||
}
|
||||
if priorMaintInterval != p.FullCycle.Interval {
|
||||
logger.Infof("Full maintenance interval change from %v to %v", priorMaintInterval, p.FullCycle.Interval)
|
||||
}
|
||||
|
||||
p.Owner = r.ClientOptions().UsernameAtHost()
|
||||
|
||||
if err := maintenance.SetParams(ctx, w, &p); err != nil {
|
||||
return errors.Wrap(err, "error to set maintenance params")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error to init write repo parameters")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -18,9 +18,14 @@ package kopialib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/maintenance"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
|
||||
@@ -29,6 +34,8 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend"
|
||||
repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks"
|
||||
storagemocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -239,7 +246,7 @@ func TestConnectBackupRepo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsBackupRepoCreated(t *testing.T) {
|
||||
func TestGetRepositoryStatus(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
backendStore *storagemocks.Store
|
||||
@@ -248,7 +255,7 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
setupError error
|
||||
returnStore *storagemocks.Storage
|
||||
retFuncGetBlob func(context.Context, blob.ID, int64, int64, blob.OutputBuffer) error
|
||||
expected bool
|
||||
expected RepoStatus
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
@@ -256,6 +263,7 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
},
|
||||
expected: RepoStatusUnknown,
|
||||
expectedErr: "error to setup backend storage: error to find storage type",
|
||||
},
|
||||
{
|
||||
@@ -266,6 +274,7 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
setupError: errors.New("fake-setup-error"),
|
||||
expected: RepoStatusUnknown,
|
||||
expectedErr: "error to setup backend storage: error to setup storage: fake-setup-error",
|
||||
},
|
||||
{
|
||||
@@ -276,10 +285,21 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
connectErr: errors.New("fake-connect-error"),
|
||||
expected: RepoStatusUnknown,
|
||||
expectedErr: "error to connect to storage: fake-connect-error",
|
||||
},
|
||||
{
|
||||
name: "get blob error",
|
||||
name: "storage not exist",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
connectErr: backend.ErrStoreNotExist,
|
||||
expected: RepoStatusSystemNotCreated,
|
||||
},
|
||||
{
|
||||
name: "get repo blob error",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
@@ -289,10 +309,24 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
retFuncGetBlob: func(context.Context, blob.ID, int64, int64, blob.OutputBuffer) error {
|
||||
return errors.New("fake-get-blob-error")
|
||||
},
|
||||
expectedErr: "error to read format blob: fake-get-blob-error",
|
||||
expected: RepoStatusUnknown,
|
||||
expectedErr: "error reading format blob: fake-get-blob-error",
|
||||
},
|
||||
{
|
||||
name: "wrong format",
|
||||
name: "no repo blob",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
returnStore: new(storagemocks.Storage),
|
||||
retFuncGetBlob: func(context.Context, blob.ID, int64, int64, blob.OutputBuffer) error {
|
||||
return blob.ErrBlobNotFound
|
||||
},
|
||||
expected: RepoStatusSystemNotCreated,
|
||||
},
|
||||
{
|
||||
name: "wrong repo format",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
@@ -303,8 +337,105 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
output.Write([]byte("fake-buffer"))
|
||||
return nil
|
||||
},
|
||||
expected: RepoStatusCorrupted,
|
||||
expectedErr: "invalid format blob: invalid character 'k' in literal false (expecting 'l')",
|
||||
},
|
||||
{
|
||||
name: "get udm repo blob error",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
returnStore: new(storagemocks.Storage),
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
if blobID == udmRepoBlobID {
|
||||
return errors.New("fake-get-blob-error")
|
||||
} else {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[],"keyAlgo":"","encryption":""}`))
|
||||
return nil
|
||||
}
|
||||
},
|
||||
expected: RepoStatusUnknown,
|
||||
expectedErr: "error reading udm repo blob: fake-get-blob-error",
|
||||
},
|
||||
{
|
||||
name: "no udm repo blob",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
returnStore: new(storagemocks.Storage),
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
if blobID == udmRepoBlobID {
|
||||
return blob.ErrBlobNotFound
|
||||
} else {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[],"keyAlgo":"","encryption":""}`))
|
||||
return nil
|
||||
}
|
||||
},
|
||||
expected: RepoStatusNotInitialized,
|
||||
},
|
||||
{
|
||||
name: "wrong udm repo metadata",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
returnStore: new(storagemocks.Storage),
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
if blobID == udmRepoBlobID {
|
||||
output.Write([]byte("fake-buffer"))
|
||||
} else {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[],"keyAlgo":"","encryption":""}`))
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
expected: RepoStatusCorrupted,
|
||||
expectedErr: "invalid udm repo blob: invalid character 'k' in literal false (expecting 'l')",
|
||||
},
|
||||
{
|
||||
name: "wrong unique id",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
returnStore: new(storagemocks.Storage),
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
if blobID == udmRepoBlobID {
|
||||
output.Write([]byte(`{"uniqueID":[4,5,6]}`))
|
||||
} else {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[1,2,3],"keyAlgo":"","encryption":""}`))
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
expected: RepoStatusCorrupted,
|
||||
expectedErr: "unique ID doesn't match: [4 5 6]([1 2 3])",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "fake-file",
|
||||
StorageType: udmrepo.StorageTypeAzure,
|
||||
},
|
||||
backendStore: new(storagemocks.Store),
|
||||
returnStore: new(storagemocks.Storage),
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
if blobID == udmRepoBlobID {
|
||||
output.Write([]byte(`{"uniqueID":[1,2,3]}`))
|
||||
} else {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[1,2,3],"keyAlgo":"","encryption":""}`))
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
expected: RepoStatusCreated,
|
||||
},
|
||||
}
|
||||
|
||||
logger := velerotest.NewLogger()
|
||||
@@ -326,7 +457,7 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
tc.returnStore.On("GetBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.retFuncGetBlob)
|
||||
}
|
||||
|
||||
created, err := IsBackupRepoCreated(t.Context(), tc.repoOptions, logger)
|
||||
status, err := GetRepositoryStatus(t.Context(), tc.repoOptions, logger)
|
||||
|
||||
if tc.expectedErr == "" {
|
||||
require.NoError(t, err)
|
||||
@@ -334,7 +465,390 @@ func TestIsBackupRepoCreated(t *testing.T) {
|
||||
require.EqualError(t, err, tc.expectedErr)
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.expected, created)
|
||||
assert.Equal(t, tc.expected, status)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteInitParameters(t *testing.T) {
|
||||
var directRpo *repomocks.DirectRepository
|
||||
assertFullMaintIntervalEqual := func(expected, actual *maintenance.Params) bool {
|
||||
return assert.Equal(t, expected.FullCycle.Interval, actual.FullCycle.Interval)
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
repoOptions udmrepo.RepoOptions
|
||||
returnRepo *repomocks.DirectRepository
|
||||
returnRepoWriter *repomocks.DirectRepositoryWriter
|
||||
repoOpen func(context.Context, string, string, *repo.Options) (repo.Repository, error)
|
||||
newRepoWriterError error
|
||||
replaceManifestError error
|
||||
getParam func(context.Context, repo.Repository) (*maintenance.Params, error)
|
||||
// expected replacemanifest params to be received by maintenance.SetParams, and therefore writeInitParameters
|
||||
expectedReplaceManifestsParams *maintenance.Params
|
||||
// allows for asserting only certain fields are set as expected
|
||||
assertReplaceManifestsParams func(*maintenance.Params, *maintenance.Params) bool
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "repo open fail, repo not exist",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return nil, os.ErrNotExist
|
||||
},
|
||||
expectedErr: "error to open repo, repo doesn't exist: file does not exist",
|
||||
},
|
||||
{
|
||||
name: "repo open fail, other error",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return nil, errors.New("fake-repo-open-error")
|
||||
},
|
||||
expectedErr: "error to open repo: fake-repo-open-error",
|
||||
},
|
||||
{
|
||||
name: "get params error",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
getParam: func(context.Context, repo.Repository) (*maintenance.Params, error) {
|
||||
return nil, errors.New("fake-get-param-error")
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
expectedErr: "error getting existing maintenance params: fake-get-param-error",
|
||||
},
|
||||
{
|
||||
name: "existing param with identical owner",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
getParam: func(context.Context, repo.Repository) (*maintenance.Params, error) {
|
||||
return &maintenance.Params{
|
||||
Owner: "default@default",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "existing param with different owner",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
getParam: func(context.Context, repo.Repository) (*maintenance.Params, error) {
|
||||
return &maintenance.Params{
|
||||
Owner: "fake-owner",
|
||||
}, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedReplaceManifestsParams: &maintenance.Params{
|
||||
FullCycle: maintenance.CycleParams{
|
||||
Interval: udmrepo.NormalGCInterval,
|
||||
},
|
||||
},
|
||||
assertReplaceManifestsParams: assertFullMaintIntervalEqual,
|
||||
},
|
||||
{
|
||||
name: "write session fail",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
newRepoWriterError: errors.New("fake-new-writer-error"),
|
||||
expectedErr: "error to init write repo parameters: unable to create writer: fake-new-writer-error",
|
||||
},
|
||||
{
|
||||
name: "set repo param fail",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
GeneralOptions: map[string]string{},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
replaceManifestError: errors.New("fake-replace-manifest-error"),
|
||||
expectedErr: "error to init write repo parameters: error to set maintenance params: put manifest: fake-replace-manifest-error",
|
||||
},
|
||||
{
|
||||
name: "repo with maintenance interval has expected params",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
StorageOptions: map[string]string{
|
||||
udmrepo.StoreOptionKeyFullMaintenanceInterval: string(udmrepo.FastGC),
|
||||
},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedReplaceManifestsParams: &maintenance.Params{
|
||||
FullCycle: maintenance.CycleParams{
|
||||
Interval: udmrepo.FastGCInterval,
|
||||
},
|
||||
},
|
||||
assertReplaceManifestsParams: assertFullMaintIntervalEqual,
|
||||
},
|
||||
{
|
||||
name: "repo with empty maintenance interval has expected params",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
StorageOptions: map[string]string{
|
||||
udmrepo.StoreOptionKeyFullMaintenanceInterval: string(""),
|
||||
},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedReplaceManifestsParams: &maintenance.Params{
|
||||
FullCycle: maintenance.CycleParams{
|
||||
Interval: udmrepo.NormalGCInterval,
|
||||
},
|
||||
},
|
||||
assertReplaceManifestsParams: assertFullMaintIntervalEqual,
|
||||
},
|
||||
{
|
||||
name: "repo with invalid maintenance interval has expected errors",
|
||||
repoOptions: udmrepo.RepoOptions{
|
||||
ConfigFilePath: "/tmp",
|
||||
StorageOptions: map[string]string{
|
||||
udmrepo.StoreOptionKeyFullMaintenanceInterval: string("foo"),
|
||||
},
|
||||
},
|
||||
repoOpen: func(context.Context, string, string, *repo.Options) (repo.Repository, error) {
|
||||
return directRpo, nil
|
||||
},
|
||||
returnRepo: new(repomocks.DirectRepository),
|
||||
returnRepoWriter: new(repomocks.DirectRepositoryWriter),
|
||||
expectedErr: "error to init write repo parameters: invalid full maintenance interval option foo",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
logger := velerotest.NewLogger()
|
||||
ctx := t.Context()
|
||||
|
||||
if tc.repoOpen != nil {
|
||||
kopiaRepoOpen = tc.repoOpen
|
||||
}
|
||||
|
||||
if tc.returnRepo != nil {
|
||||
directRpo = tc.returnRepo
|
||||
}
|
||||
|
||||
if tc.returnRepo != nil {
|
||||
tc.returnRepo.On("NewWriter", mock.Anything, mock.Anything).Return(ctx, tc.returnRepoWriter, tc.newRepoWriterError)
|
||||
tc.returnRepo.On("ClientOptions").Return(repo.ClientOptions{})
|
||||
tc.returnRepo.On("Close", mock.Anything).Return(nil)
|
||||
}
|
||||
|
||||
if tc.returnRepoWriter != nil {
|
||||
tc.returnRepoWriter.On("Close", mock.Anything).Return(nil)
|
||||
if tc.replaceManifestError != nil {
|
||||
tc.returnRepoWriter.On("ReplaceManifests", mock.Anything, mock.Anything, mock.Anything).Return(manifest.ID(""), tc.replaceManifestError)
|
||||
}
|
||||
if tc.expectedReplaceManifestsParams != nil {
|
||||
tc.returnRepoWriter.On("ReplaceManifests", mock.Anything, mock.AnythingOfType("map[string]string"), mock.AnythingOfType("*maintenance.Params")).Return(manifest.ID(""), nil)
|
||||
tc.returnRepoWriter.On("Flush", mock.Anything).Return(nil)
|
||||
}
|
||||
}
|
||||
|
||||
if tc.getParam != nil {
|
||||
funcGetParam = tc.getParam
|
||||
} else {
|
||||
funcGetParam = func(ctx context.Context, rep repo.Repository) (*maintenance.Params, error) {
|
||||
return &maintenance.Params{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
err := writeInitParameters(ctx, tc.repoOptions, logger)
|
||||
|
||||
if tc.expectedErr == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.EqualError(t, err, tc.expectedErr)
|
||||
}
|
||||
if tc.expectedReplaceManifestsParams != nil {
|
||||
actualReplaceManifestsParams, converted := tc.returnRepoWriter.Calls[0].Arguments.Get(2).(*maintenance.Params)
|
||||
assert.True(t, converted)
|
||||
tc.assertReplaceManifestsParams(tc.expectedReplaceManifestsParams, actualReplaceManifestsParams)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteUdmRepoMetadata(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
retFuncGetBlob func(context.Context, blob.ID, int64, int64, blob.OutputBuffer) error
|
||||
retFuncPutBlob func(context.Context, blob.ID, blob.Bytes, blob.PutOptions) error
|
||||
replaceMetadata *udmRepoMetadata
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "get repo blob error",
|
||||
retFuncGetBlob: func(context.Context, blob.ID, int64, int64, blob.OutputBuffer) error {
|
||||
return errors.New("fake-get-blob-error")
|
||||
},
|
||||
expectedErr: "error reading format blob: fake-get-blob-error",
|
||||
},
|
||||
{
|
||||
name: "wrong repo format",
|
||||
retFuncGetBlob: func(ctx context.Context, id blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
output.Write([]byte("fake-buffer"))
|
||||
return nil
|
||||
},
|
||||
expectedErr: "invalid format blob: invalid character 'k' in literal false (expecting 'l')",
|
||||
},
|
||||
{
|
||||
name: "put udm repo metadata blob error",
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[],"keyAlgo":"","encryption":""}`))
|
||||
return nil
|
||||
},
|
||||
retFuncPutBlob: func(context.Context, blob.ID, blob.Bytes, blob.PutOptions) error {
|
||||
return errors.New("fake-put-blob-error")
|
||||
},
|
||||
expectedErr: "error writing udm repo metadata: fake-put-blob-error",
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
retFuncGetBlob: func(ctx context.Context, blobID blob.ID, offset int64, length int64, output blob.OutputBuffer) error {
|
||||
output.Write([]byte(`{"tool":"","buildVersion":"","buildInfo":"","uniqueID":[],"keyAlgo":"","encryption":""}`))
|
||||
return nil
|
||||
},
|
||||
retFuncPutBlob: func(context.Context, blob.ID, blob.Bytes, blob.PutOptions) error {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
storage := new(storagemocks.Storage)
|
||||
if tc.retFuncGetBlob != nil {
|
||||
storage.On("GetBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.retFuncGetBlob)
|
||||
}
|
||||
|
||||
if tc.retFuncPutBlob != nil {
|
||||
storage.On("PutBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.retFuncPutBlob)
|
||||
}
|
||||
|
||||
err := writeUdmRepoMetadata(t.Context(), storage)
|
||||
|
||||
if tc.expectedErr == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.EqualError(t, err, tc.expectedErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testRecv struct {
|
||||
buffer []byte
|
||||
}
|
||||
|
||||
func (r *testRecv) Write(p []byte) (n int, err error) {
|
||||
r.buffer = append(r.buffer, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func TestByteBuffer(t *testing.T) {
|
||||
buffer := &byteBuffer{}
|
||||
written, err := buffer.Write([]byte("12345"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 5, written)
|
||||
|
||||
written, err = buffer.Write([]byte("67890"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 5, written)
|
||||
require.Equal(t, 10, buffer.Length())
|
||||
|
||||
recv := &testRecv{}
|
||||
copied, err := buffer.WriteTo(recv)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(10), copied)
|
||||
require.Equal(t, []byte("1234567890"), recv.buffer)
|
||||
|
||||
buffer.Reset()
|
||||
require.Zero(t, buffer.Length())
|
||||
}
|
||||
|
||||
func TestByteBufferReader(t *testing.T) {
|
||||
buffer := &byteBufferReader{buffer: []byte("123456789012345678901234567890")}
|
||||
off, err := buffer.Seek(100, io.SeekStart)
|
||||
require.Equal(t, int64(-1), off)
|
||||
require.EqualError(t, err, "invalid seek")
|
||||
require.Zero(t, buffer.pos)
|
||||
|
||||
off, err = buffer.Seek(-100, io.SeekEnd)
|
||||
require.Equal(t, int64(-1), off)
|
||||
require.EqualError(t, err, "invalid seek")
|
||||
require.Zero(t, buffer.pos)
|
||||
|
||||
off, err = buffer.Seek(3, io.SeekCurrent)
|
||||
require.Equal(t, int64(3), off)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, buffer.pos)
|
||||
|
||||
output := make([]byte, 6)
|
||||
read, err := buffer.Read(output)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 6, read)
|
||||
require.Equal(t, 9, buffer.pos)
|
||||
require.Equal(t, []byte("456789"), output)
|
||||
|
||||
off, err = buffer.Seek(21, io.SeekStart)
|
||||
require.Equal(t, int64(21), off)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 21, buffer.pos)
|
||||
|
||||
output = make([]byte, 6)
|
||||
read, err = buffer.Read(output)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 6, read)
|
||||
require.Equal(t, 27, buffer.pos)
|
||||
require.Equal(t, []byte("234567"), output)
|
||||
|
||||
output = make([]byte, 6)
|
||||
read, err = buffer.Read(output)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, read)
|
||||
require.Equal(t, 30, buffer.pos)
|
||||
require.Equal(t, []byte{'8', '9', '0', 0, 0, 0}, output)
|
||||
|
||||
output = make([]byte, 6)
|
||||
read, err = buffer.Read(output)
|
||||
require.Zero(t, read)
|
||||
require.Equal(t, io.EOF, err)
|
||||
|
||||
err = buffer.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user