Merge pull request #8580 from Lyndon-Li/recall-repo-maintenance-history-on-restart
Some checks failed
Run the E2E test on kind / build (push) Failing after 5m31s
Run the E2E test on kind / setup-test-matrix (push) Successful in 2s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / Build (push) Failing after 32s
Close stale issues and PRs / stale (push) Successful in 9s
Trivy Nightly Scan / Trivy nightly scan (velero, main) (push) Failing after 53s
Trivy Nightly Scan / Trivy nightly scan (velero-restore-helper, main) (push) Failing after 49s

Recall repo maintenance history on restart
This commit is contained in:
lyndon-li
2025-01-17 14:08:27 +08:00
committed by GitHub
13 changed files with 2537 additions and 1260 deletions

View File

@@ -0,0 +1 @@
Fix issue #7753, recall repo maintenance history on Velero server restart

2
go.mod
View File

@@ -31,6 +31,7 @@ require (
github.com/kubernetes-csi/external-snapshotter/client/v7 v7.0.0
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
github.com/robfig/cron/v3 v3.0.1
@@ -154,7 +155,6 @@ require (
github.com/natefinch/atomic v1.0.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oklog/run v1.0.0 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect

View File

@@ -5,28 +5,35 @@ import (
"fmt"
"os"
"strings"
"time"
"github.com/bombsimon/logrusr/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerocli "github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager"
)
type Options struct {
RepoName string
BackupStorageLocation string
RepoType string
ResourceTimeout time.Duration
LogLevelFlag *logging.LevelFlag
FormatFlag *logging.FormatFlag
}
@@ -61,6 +68,8 @@ func (o *Options) Run(f velerocli.Factory) {
logger := logging.DefaultLogger(o.LogLevelFlag.Parse(), o.FormatFlag.Parse())
logger.SetOutput(os.Stdout)
ctrl.SetLogger(logrusr.New(logger))
pruneError := o.runRepoPrune(f, f.Namespace(), logger)
defer func() {
if pruneError != nil {
@@ -110,12 +119,14 @@ func (o *Options) initClient(f velerocli.Factory) (client.Client, error) {
return cli, nil
}
func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error {
cli, err := o.initClient(f)
if err != nil {
return err
func initRepoManager(namespace string, cli client.Client, kubeClient kubernetes.Interface, logger logrus.FieldLogger) (repomanager.Manager, error) {
// ensure the repo key secret is set up
if err := repokey.EnsureCommonRepositoryKey(kubeClient.CoreV1(), namespace); err != nil {
return nil, errors.Wrap(err, "failed to ensure repository key")
}
repoLocker := repository.NewRepoLocker()
credentialFileStore, err := credentials.NewNamespacedFileStore(
cli,
namespace,
@@ -123,23 +134,38 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log
filesystem.NewFileSystem(),
)
if err != nil {
return errors.Wrap(err, "failed to create namespaced file store")
return nil, errors.Wrap(err, "failed to create namespaced file store")
}
credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, namespace)
if err != nil {
return errors.Wrap(err, "failed to create namespaced secret store")
return nil, errors.Wrap(err, "failed to create namespaced secret store")
}
var repoProvider provider.Provider
if o.RepoType == velerov1api.BackupRepositoryTypeRestic {
repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger)
} else {
repoProvider = provider.NewUnifiedRepoProvider(
credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, o.RepoType, logger)
return repomanager.NewManager(
namespace,
cli,
repoLocker,
credentialFileStore,
credentialSecretStore,
logger,
), nil
}
func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error {
cli, err := o.initClient(f)
if err != nil {
return err
}
kubeClient, err := f.KubeClient()
if err != nil {
return err
}
manager, err := initRepoManager(namespace, cli, kubeClient, logger)
if err != nil {
return err
}
// backupRepository
@@ -149,31 +175,14 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log
BackupLocation: o.BackupStorageLocation,
RepositoryType: o.RepoType,
}, true)
if err != nil {
return errors.Wrap(err, "failed to get backup repository")
}
// bsl
bsl := &velerov1api.BackupStorageLocation{}
err = cli.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: repo.Spec.BackupStorageLocation}, bsl)
if err != nil {
return errors.Wrap(err, "failed to get backup storage location")
}
para := provider.RepoParam{
BackupRepo: repo,
BackupLocation: bsl,
}
err = repoProvider.BoostRepoConnect(context.Background(), para)
if err != nil {
return errors.Wrap(err, "failed to boost repo connect")
}
err = repoProvider.PruneRepo(context.Background(), para)
err = manager.PruneRepo(repo)
if err != nil {
return errors.Wrap(err, "failed to prune repo")
}
return nil
}

View File

@@ -502,15 +502,9 @@ func (s *server) initRepoManager() error {
s.namespace,
s.mgr.GetClient(),
s.repoLocker,
s.repoEnsurer,
s.credentialFileStore,
s.credentialSecretStore,
s.config.RepoMaintenanceJobConfig,
s.config.PodResources,
s.config.KeepLatestMaintenanceJobs,
s.logger,
s.logLevel,
s.config.LogFormat,
)
return nil
@@ -731,9 +725,14 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.namespace,
s.logger,
s.mgr.GetClient(),
s.repoManager,
s.config.RepoMaintenanceFrequency,
s.config.BackupRepoConfig,
s.repoManager,
s.config.KeepLatestMaintenanceJobs,
s.config.RepoMaintenanceJobConfig,
s.config.PodResources,
s.logLevel,
s.config.LogFormat,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupRepo)
}

View File

@@ -22,8 +22,10 @@ import (
"encoding/json"
"fmt"
"reflect"
"slices"
"time"
"github.com/petar/GoLLRB/llrb"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -41,8 +43,10 @@ import (
"github.com/vmware-tanzu/velero/pkg/constant"
"github.com/vmware-tanzu/velero/pkg/label"
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
"github.com/vmware-tanzu/velero/pkg/repository/maintenance"
repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
const (
@@ -53,16 +57,22 @@ const (
type BackupRepoReconciler struct {
client.Client
namespace string
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
maintenanceFrequency time.Duration
backupRepoConfig string
repositoryManager repomanager.Manager
namespace string
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
maintenanceFrequency time.Duration
backupRepoConfig string
repositoryManager repomanager.Manager
keepLatestMaintenanceJobs int
repoMaintenanceConfig string
maintenanceJobResources kube.PodResources
logLevel logrus.Level
logFormat *logging.FormatFlag
}
func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client,
maintenanceFrequency time.Duration, backupRepoConfig string, repositoryManager repomanager.Manager) *BackupRepoReconciler {
func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, repositoryManager repomanager.Manager,
maintenanceFrequency time.Duration, backupRepoConfig string, keepLatestMaintenanceJobs int, repoMaintenanceConfig string, maintenanceJobResources kube.PodResources,
logLevel logrus.Level, logFormat *logging.FormatFlag) *BackupRepoReconciler {
c := &BackupRepoReconciler{
client,
namespace,
@@ -71,6 +81,11 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client
maintenanceFrequency,
backupRepoConfig,
repositoryManager,
keepLatestMaintenanceJobs,
repoMaintenanceConfig,
maintenanceJobResources,
logLevel,
logFormat,
}
return c
@@ -206,7 +221,17 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
fallthrough
case velerov1api.BackupRepositoryPhaseReady:
return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log)
if err := r.recallMaintenance(ctx, backupRepo, log); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error handling incomplete repo maintenance jobs")
}
if err := r.runMaintenanceIfDue(ctx, backupRepo, log); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error check and run repo maintenance jobs")
}
if err := maintenance.DeleteOldJobs(r.Client, req.Name, r.keepLatestMaintenanceJobs); err != nil {
log.WithError(err).Warn("Failed to delete old maintenance jobs")
}
}
return ctrl.Result{}, nil
@@ -299,6 +324,99 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repomanager.Mana
return repoManager.PrepareRepo(repo)
}
func (r *BackupRepoReconciler) recallMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
history, err := maintenance.WaitAllJobsComplete(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log)
if err != nil {
return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name)
}
consolidated := consolidateHistory(history, req.Status.RecentMaintenance)
if consolidated == nil {
return nil
}
lastMaintenanceTime := getLastMaintenanceTimeFromHistory(consolidated)
log.Warn("Updating backup repository because of unrecorded histories")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
if lastMaintenanceTime.After(rr.Status.LastMaintenanceTime.Time) {
log.Warnf("Updating backup repository last maintenance time (%v) from history (%v)", rr.Status.LastMaintenanceTime.Time, lastMaintenanceTime.Time)
rr.Status.LastMaintenanceTime = lastMaintenanceTime
}
rr.Status.RecentMaintenance = consolidated
})
}
type maintenanceStatusWrapper struct {
status *velerov1api.BackupRepositoryMaintenanceStatus
}
func (w maintenanceStatusWrapper) Less(other llrb.Item) bool {
return w.status.StartTimestamp.Before(other.(maintenanceStatusWrapper).status.StartTimestamp)
}
func consolidateHistory(coming, cur []velerov1api.BackupRepositoryMaintenanceStatus) []velerov1api.BackupRepositoryMaintenanceStatus {
if len(coming) == 0 {
return nil
}
if slices.EqualFunc(cur, coming, func(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool {
return a.StartTimestamp.Equal(b.StartTimestamp)
}) {
return nil
}
consolidator := llrb.New()
for i := range cur {
consolidator.ReplaceOrInsert(maintenanceStatusWrapper{&cur[i]})
}
for i := range coming {
consolidator.ReplaceOrInsert(maintenanceStatusWrapper{&coming[i]})
}
truncated := []velerov1api.BackupRepositoryMaintenanceStatus{}
for consolidator.Len() > 0 {
if len(truncated) == defaultMaintenanceStatusQueueLength {
break
}
item := consolidator.DeleteMax()
truncated = append(truncated, *item.(maintenanceStatusWrapper).status)
}
slices.Reverse(truncated)
if slices.EqualFunc(cur, truncated, func(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool {
return a.StartTimestamp.Equal(b.StartTimestamp)
}) {
return nil
}
return truncated
}
func getLastMaintenanceTimeFromHistory(history []velerov1api.BackupRepositoryMaintenanceStatus) *metav1.Time {
time := history[0].CompleteTimestamp
for i := range history {
if history[i].CompleteTimestamp == nil {
continue
}
if time == nil || time.Before(history[i].CompleteTimestamp) {
time = history[i].CompleteTimestamp
}
}
return time
}
var funcStartMaintenanceJob = maintenance.StartNewJob
var funcWaitMaintenanceJobComplete = maintenance.WaitJobComplete
func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
startTime := r.clock.Now()
@@ -309,29 +427,39 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel
log.Info("Running maintenance on backup repository")
// prune failures should be displayed in the `.status.message` field but
// should not cause the repo to move to `NotReady`.
log.Debug("Pruning repo")
if err := r.repositoryManager.PruneRepo(req); err != nil {
log.WithError(err).Warn("error pruning repository")
job, err := funcStartMaintenanceJob(r.Client, ctx, req, r.repoMaintenanceConfig, r.maintenanceJobResources, r.logLevel, r.logFormat, log)
if err != nil {
log.WithError(err).Warn("Starting repo maintenance failed")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, startTime, r.clock.Now(), err.Error())
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, &metav1.Time{Time: startTime}, nil, fmt.Sprintf("Failed to start maintenance job, err: %v", err))
})
}
// when WaitMaintenanceJobComplete fails, the maintenance result will be left aside temporarily
// If the maintenenance still completes later, recallMaintenance recalls the left once and update LastMaintenanceTime and history
status, err := funcWaitMaintenanceJobComplete(r.Client, ctx, job, r.namespace, log)
if err != nil {
return errors.Wrapf(err, "error waiting repo maintenance completion status")
}
if status.Result == velerov1api.BackupRepositoryMaintenanceFailed {
log.WithError(err).Warn("Pruning repository failed")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp, status.CompleteTimestamp, status.Message)
})
}
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
completionTime := r.clock.Now()
rr.Status.LastMaintenanceTime = &metav1.Time{Time: completionTime}
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, startTime, completionTime, "")
rr.Status.LastMaintenanceTime = &metav1.Time{Time: status.CompleteTimestamp.Time}
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp, status.CompleteTimestamp, status.Message)
})
}
func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime time.Time, completionTime time.Time, message string) {
func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime, completionTime *metav1.Time, message string) {
latest := velerov1api.BackupRepositoryMaintenanceStatus{
Result: result,
StartTimestamp: &metav1.Time{Time: startTime},
CompleteTimestamp: &metav1.Time{Time: completionTime},
StartTimestamp: startTime,
CompleteTimestamp: completionTime,
Message: message,
}

View File

@@ -19,37 +19,51 @@ import (
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/repository/maintenance"
repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks"
repotypes "github.com/vmware-tanzu/velero/pkg/repository/types"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
batchv1 "k8s.io/api/batch/v1"
)
const testMaintenanceFrequency = 10 * time.Minute
func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret interface{}) *BackupRepoReconciler {
func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret ...interface{}) *BackupRepoReconciler {
t.Helper()
mgr := &repomokes.Manager{}
if mockOn != "" {
mgr.On(mockOn, arg).Return(ret)
mgr.On(mockOn, arg).Return(ret...)
}
return NewBackupRepoReconciler(
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
mgr,
testMaintenanceFrequency,
"fake-repo-config",
mgr,
3,
"",
kube.PodResources{},
logrus.InfoLevel,
nil,
)
}
@@ -104,21 +118,280 @@ func TestCheckNotReadyRepo(t *testing.T) {
assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier)
}
func TestRunMaintenanceIfDue(t *testing.T) {
rr := mockBackupRepositoryCR()
reconciler := mockBackupRepoReconciler(t, "PruneRepo", rr, nil)
err := reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
lastTm := rr.Status.LastMaintenanceTime
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.NotEqual(t, rr.Status.LastMaintenanceTime, lastTm)
func startMaintenanceJobFail(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) {
return "", errors.New("fake-start-error")
}
rr.Status.LastMaintenanceTime = &metav1.Time{Time: time.Now()}
lastTm = rr.Status.LastMaintenanceTime
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm)
func startMaintenanceJobSucceed(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) {
return "fake-job-name", nil
}
func waitMaintenanceJobCompleteFail(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.New("fake-wait-error")
}
func waitMaintenanceJobCompleteFunc(now time.Time, result velerov1api.BackupRepositoryMaintenanceResult, message string) func(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
return func(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
return velerov1api.BackupRepositoryMaintenanceStatus{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: result,
Message: message,
}, nil
}
}
type fakeClock struct {
now time.Time
}
func (f *fakeClock) After(time.Duration) <-chan time.Time {
return nil
}
func (f *fakeClock) NewTicker(time.Duration) clock.Ticker {
return nil
}
func (f *fakeClock) NewTimer(time.Duration) clock.Timer {
return nil
}
func (f *fakeClock) Now() time.Time {
return f.now
}
func (f *fakeClock) Since(time.Time) time.Duration {
return 0
}
func (f *fakeClock) Sleep(time.Duration) {}
func (f *fakeClock) Tick(time.Duration) <-chan time.Time {
return nil
}
func (f *fakeClock) AfterFunc(time.Duration, func()) clock.Timer {
return nil
}
func TestRunMaintenanceIfDue(t *testing.T) {
now := time.Now().Round(time.Second)
tests := []struct {
name string
repo *velerov1api.BackupRepository
startJobFunc func(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error)
waitJobFunc func(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error)
expectedMaintenanceTime time.Time
expectedHistory []velerov1api.BackupRepositoryMaintenanceStatus
expectedErr string
}{
{
name: "not due",
repo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: time.Hour},
},
Status: velerov1api.BackupRepositoryStatus{
LastMaintenanceTime: &metav1.Time{Time: now},
RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
},
expectedMaintenanceTime: now,
expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
{
name: "start failed",
repo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: time.Hour},
},
Status: velerov1api.BackupRepositoryStatus{
LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)},
RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
},
startJobFunc: startMaintenanceJobFail,
expectedMaintenanceTime: now.Add(-time.Hour - time.Minute),
expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
{
StartTimestamp: &metav1.Time{Time: now},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "Failed to start maintenance job, err: fake-start-error",
},
},
},
{
name: "wait failed",
repo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: time.Hour},
},
Status: velerov1api.BackupRepositoryStatus{
LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)},
RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
},
startJobFunc: startMaintenanceJobSucceed,
waitJobFunc: waitMaintenanceJobCompleteFail,
expectedErr: "error waiting repo maintenance completion status: fake-wait-error",
expectedMaintenanceTime: now.Add(-time.Hour - time.Minute),
expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
{
name: "maintenance failed",
repo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: time.Hour},
},
Status: velerov1api.BackupRepositoryStatus{
LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)},
RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
},
startJobFunc: startMaintenanceJobSucceed,
waitJobFunc: waitMaintenanceJobCompleteFunc(now, velerov1api.BackupRepositoryMaintenanceFailed, "fake-maintenance-message"),
expectedMaintenanceTime: now.Add(-time.Hour - time.Minute),
expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message",
},
},
},
{
name: "maintenance succeeded",
repo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1api.DefaultNamespace,
Name: "repo",
},
Spec: velerov1api.BackupRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: time.Hour},
},
Status: velerov1api.BackupRepositoryStatus{
LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)},
RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
},
startJobFunc: startMaintenanceJobSucceed,
waitJobFunc: waitMaintenanceJobCompleteFunc(now, velerov1api.BackupRepositoryMaintenanceSucceeded, ""),
expectedMaintenanceTime: now.Add(time.Hour),
expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
reconciler := mockBackupRepoReconciler(t, "", test.repo, nil)
reconciler.clock = &fakeClock{now}
err := reconciler.Client.Create(context.TODO(), test.repo)
assert.NoError(t, err)
funcStartMaintenanceJob = test.startJobFunc
funcWaitMaintenanceJobComplete = test.waitJobFunc
err = reconciler.runMaintenanceIfDue(context.TODO(), test.repo, velerotest.NewLogger())
if test.expectedErr == "" {
assert.NoError(t, err)
}
assert.Equal(t, test.expectedMaintenanceTime, test.repo.Status.LastMaintenanceTime.Time)
assert.Len(t, test.repo.Status.RecentMaintenance, len(test.expectedHistory))
for i := 0; i < len(test.expectedHistory); i++ {
assert.Equal(t, test.expectedHistory[i].StartTimestamp.Time, test.repo.Status.RecentMaintenance[i].StartTimestamp.Time)
if test.expectedHistory[i].CompleteTimestamp == nil {
assert.Nil(t, test.repo.Status.RecentMaintenance[i].CompleteTimestamp)
} else {
assert.Equal(t, test.expectedHistory[i].CompleteTimestamp.Time, test.repo.Status.RecentMaintenance[i].CompleteTimestamp.Time)
}
assert.Equal(t, test.expectedHistory[i].Result, test.repo.Status.RecentMaintenance[i].Result)
assert.Equal(t, test.expectedHistory[i].Message, test.repo.Status.RecentMaintenance[i].Message)
}
})
}
}
func TestInitializeRepo(t *testing.T) {
@@ -248,9 +521,14 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) {
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
&mgr,
test.userDefinedFreq,
"",
&mgr,
3,
"",
kube.PodResources{},
logrus.InfoLevel,
nil,
)
freq := reconciler.getRepositoryMaintenanceFrequency(test.repo)
@@ -377,7 +655,14 @@ func TestNeedInvalidBackupRepo(t *testing.T) {
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
time.Duration(0), "", nil)
nil,
time.Duration(0),
"",
3,
"",
kube.PodResources{},
logrus.InfoLevel,
nil)
need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL)
assert.Equal(t, test.expect, need)
@@ -653,7 +938,7 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
updateRepoMaintenanceHistory(test.backupRepo, test.result, standardTime, standardTime.Add(time.Hour), "fake-message-0")
updateRepoMaintenanceHistory(test.backupRepo, test.result, &metav1.Time{Time: standardTime}, &metav1.Time{Time: standardTime.Add(time.Hour)}, "fake-message-0")
for at := range test.backupRepo.Status.RecentMaintenance {
assert.Equal(t, test.expectedHistory[at].StartTimestamp.Time, test.backupRepo.Status.RecentMaintenance[at].StartTimestamp.Time)
@@ -663,3 +948,467 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) {
})
}
}
func TestRecallMaintenance(t *testing.T) {
now := time.Now().Round(time.Second)
schemeFail := runtime.NewScheme()
velerov1api.AddToScheme(schemeFail)
scheme := runtime.NewScheme()
batchv1.AddToScheme(scheme)
corev1.AddToScheme(scheme)
velerov1api.AddToScheme(scheme)
jobSucceeded := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: velerov1api.DefaultNamespace,
Labels: map[string]string{maintenance.RepositoryNameLabel: "repo"},
CreationTimestamp: metav1.Time{Time: now.Add(time.Hour)},
},
Status: batchv1.JobStatus{
StartTime: &metav1.Time{Time: now.Add(time.Hour)},
CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 2)},
Succeeded: 1,
},
}
jobPodSucceeded := builder.ForPod(velerov1api.DefaultNamespace, "job1").Labels(map[string]string{"job-name": "job1"}).ContainerStatuses(&corev1.ContainerStatus{
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{},
},
}).Result()
tests := []struct {
name string
kubeClientObj []runtime.Object
runtimeScheme *runtime.Scheme
repoLastMatainTime metav1.Time
expectNewHistory []velerov1api.BackupRepositoryMaintenanceStatus
expectTimeUpdate *metav1.Time
expectedErr string
}{
{
name: "wait completion error",
runtimeScheme: schemeFail,
expectedErr: "error waiting incomplete repo maintenance job for repo repo: error listing maintenance job for repo repo: no kind is registered for the type v1.JobList in scheme \"pkg/runtime/scheme.go:100\"",
},
{
name: "no consolidate result",
runtimeScheme: scheme,
},
{
name: "no update last time",
runtimeScheme: scheme,
kubeClientObj: []runtime.Object{
jobSucceeded,
jobPodSucceeded,
},
repoLastMatainTime: metav1.Time{Time: now.Add(time.Hour * 5)},
expectNewHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
},
{
name: "update last time",
runtimeScheme: scheme,
kubeClientObj: []runtime.Object{
jobSucceeded,
jobPodSucceeded,
},
expectNewHistory: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
},
},
expectTimeUpdate: &metav1.Time{Time: now.Add(time.Hour * 2)},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := mockBackupRepoReconciler(t, "", nil, nil)
backupRepo := mockBackupRepositoryCR()
backupRepo.Status.LastMaintenanceTime = &test.repoLastMatainTime
test.kubeClientObj = append(test.kubeClientObj, backupRepo)
fakeClientBuilder := clientFake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
r.Client = fakeClient
lastTm := backupRepo.Status.LastMaintenanceTime
err := r.recallMaintenance(context.TODO(), backupRepo, velerotest.NewLogger())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
if test.expectNewHistory == nil {
assert.Nil(t, backupRepo.Status.RecentMaintenance)
} else {
assert.Len(t, backupRepo.Status.RecentMaintenance, len(test.expectNewHistory))
for i := 0; i < len(test.expectNewHistory); i++ {
assert.Equal(t, test.expectNewHistory[i].StartTimestamp.Time, backupRepo.Status.RecentMaintenance[i].StartTimestamp.Time)
assert.Equal(t, test.expectNewHistory[i].CompleteTimestamp.Time, backupRepo.Status.RecentMaintenance[i].CompleteTimestamp.Time)
assert.Equal(t, test.expectNewHistory[i].Result, backupRepo.Status.RecentMaintenance[i].Result)
assert.Equal(t, test.expectNewHistory[i].Message, backupRepo.Status.RecentMaintenance[i].Message)
}
}
if test.expectTimeUpdate != nil {
assert.Equal(t, test.expectTimeUpdate.Time, backupRepo.Status.LastMaintenanceTime.Time)
} else {
assert.Equal(t, lastTm, backupRepo.Status.LastMaintenanceTime)
}
}
})
}
}
func TestConsolidateHistory(t *testing.T) {
now := time.Now()
tests := []struct {
name string
cur []velerov1api.BackupRepositoryMaintenanceStatus
coming []velerov1api.BackupRepositoryMaintenanceStatus
expected []velerov1api.BackupRepositoryMaintenanceStatus
}{
{
name: "zero coming",
cur: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
},
expected: nil,
},
{
name: "identical coming",
cur: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
},
coming: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
},
},
expected: nil,
},
{
name: "less than limit",
cur: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
},
coming: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
},
expected: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
},
},
{
name: "more than limit",
cur: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
},
coming: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-4",
},
},
expected: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-4",
},
},
},
{
name: "more than limit 2",
cur: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
},
coming: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-4",
},
},
expected: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-4",
},
},
},
{
name: "coming is not used",
cur: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-4",
},
},
coming: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
},
},
expected: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
consolidated := consolidateHistory(test.coming, test.cur)
if test.expected == nil {
assert.Nil(t, consolidated)
} else {
assert.Len(t, consolidated, len(test.expected))
for i := 0; i < len(test.expected); i++ {
assert.Equal(t, *test.expected[i].StartTimestamp, *consolidated[i].StartTimestamp)
assert.Equal(t, *test.expected[i].CompleteTimestamp, *consolidated[i].CompleteTimestamp)
assert.Equal(t, test.expected[i].Result, consolidated[i].Result)
assert.Equal(t, test.expected[i].Message, consolidated[i].Message)
}
assert.Nil(t, consolidateHistory(test.coming, consolidated))
}
})
}
}
func TestGetLastMaintenanceTimeFromHistory(t *testing.T) {
now := time.Now()
tests := []struct {
name string
history []velerov1api.BackupRepositoryMaintenanceStatus
expected time.Time
}{
{
name: "first one is nil",
history: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
},
expected: now.Add(time.Hour * 3),
},
{
name: "another one is nil",
history: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
},
expected: now.Add(time.Hour * 3),
},
{
name: "disordered",
history: []velerov1api.BackupRepositoryMaintenanceStatus{
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message-3",
},
{
StartTimestamp: &metav1.Time{Time: now},
CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceSucceeded,
Message: "fake-maintenance-message",
},
{
StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)},
Result: velerov1api.BackupRepositoryMaintenanceFailed,
Message: "fake-maintenance-message-2",
},
},
expected: now.Add(time.Hour * 3),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
time := getLastMaintenanceTimeFromHistory(test.history)
assert.Equal(t, test.expected, time.Time)
})
}
}

View File

@@ -1,252 +0,0 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repository
import (
"context"
"encoding/json"
"fmt"
"sort"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
const (
RepositoryNameLabel = "velero.io/repo-name"
GlobalKeyForRepoMaintenanceJobCM = "global"
)
type JobConfigs struct {
// LoadAffinities is the config for repository maintenance job load affinity.
LoadAffinities []*kube.LoadAffinity `json:"loadAffinity,omitempty"`
// PodResources is the config for the CPU and memory resources setting.
PodResources *kube.PodResources `json:"podResources,omitempty"`
}
func GenerateJobName(repo string) string {
millisecond := time.Now().UTC().UnixMilli() // millisecond
jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond)
if len(jobName) > 63 { // k8s job name length limit
jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond)
}
return jobName
}
// DeleteOldMaintenanceJobs deletes old maintenance jobs and keeps the latest N jobs
func DeleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
if err != nil {
return err
}
// Delete old maintenance jobs
if len(jobList.Items) > keep {
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp)
})
for i := 0; i < len(jobList.Items)-keep; i++ {
err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return err
}
}
}
return nil
}
func WaitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error {
return wait.PollUntilContextCancel(ctx, 1, true, func(ctx context.Context) (bool, error) {
err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
if job.Status.Succeeded > 0 {
return true, nil
}
if job.Status.Failed > 0 {
return true, fmt.Errorf("maintenance job %s/%s failed", job.Namespace, job.Name)
}
return false, nil
})
}
func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
// Get the maintenance job related pod by label selector
podList := &v1.PodList{}
err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name}))
if err != nil {
return "", err
}
if len(podList.Items) == 0 {
return "", fmt.Errorf("no pod found for job %s", job.Name)
}
// we only have one maintenance pod for the job
pod := podList.Items[0]
statuses := pod.Status.ContainerStatuses
if len(statuses) == 0 {
return "", fmt.Errorf("no container statuses found for job %s", job.Name)
}
// we only have one maintenance container
terminated := statuses[0].State.Terminated
if terminated == nil {
return "", fmt.Errorf("container for job %s is not terminated", job.Name)
}
return terminated.Message, nil
}
func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, &client.ListOptions{
Namespace: ns,
},
&client.HasLabels{RepositoryNameLabel},
)
if err != nil {
return nil, err
}
if len(jobList.Items) == 0 {
return nil, nil
}
// Get the latest maintenance job
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Time.After(jobList.Items[j].CreationTimestamp.Time)
})
return &jobList.Items[0], nil
}
// GetMaintenanceJobConfig is called to get the Maintenance Job Config for the
// BackupRepository specified by the repo parameter.
//
// Params:
//
// ctx: the Go context used for controller-runtime client.
// client: the controller-runtime client.
// logger: the logger.
// veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository.
// repoMaintenanceJobConfig: the repository maintenance job ConfigMap name.
// repo: the BackupRepository needs to run the maintenance Job.
func GetMaintenanceJobConfig(
ctx context.Context,
client client.Client,
logger logrus.FieldLogger,
veleroNamespace string,
repoMaintenanceJobConfig string,
repo *velerov1api.BackupRepository,
) (*JobConfigs, error) {
var cm v1.ConfigMap
if err := client.Get(
ctx,
types.NamespacedName{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
&cm,
); err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
} else {
return nil, errors.Wrapf(
err,
"fail to get repo maintenance job configs %s", repoMaintenanceJobConfig)
}
}
if cm.Data == nil {
return nil, errors.Errorf("data is not available in config map %s", repoMaintenanceJobConfig)
}
// Generate the BackupRepository key.
// If using the BackupRepository name as the is more intuitive,
// but the BackupRepository generation is dynamic. We cannot assume
// they are ready when installing Velero.
// Instead we use the volume source namespace, BSL name, and the uploader
// type to represent the BackupRepository. The combination of those three
// keys can identify a unique BackupRepository.
repoJobConfigKey := repo.Spec.VolumeNamespace + "-" +
repo.Spec.BackupStorageLocation + "-" + repo.Spec.RepositoryType
var result *JobConfigs
if _, ok := cm.Data[repoJobConfigKey]; ok {
logger.Debugf("Find the repo maintenance config %s for repo %s", repoJobConfigKey, repo.Name)
result = new(JobConfigs)
if err := json.Unmarshal([]byte(cm.Data[repoJobConfigKey]), result); err != nil {
return nil, errors.Wrapf(
err,
"fail to unmarshal configs from %s's key %s",
repoMaintenanceJobConfig,
repoJobConfigKey)
}
}
if _, ok := cm.Data[GlobalKeyForRepoMaintenanceJobCM]; ok {
logger.Debugf("Find the global repo maintenance config for repo %s", repo.Name)
if result == nil {
result = new(JobConfigs)
}
globalResult := new(JobConfigs)
if err := json.Unmarshal([]byte(cm.Data[GlobalKeyForRepoMaintenanceJobCM]), globalResult); err != nil {
return nil, errors.Wrapf(
err,
"fail to unmarshal configs from %s's key %s",
repoMaintenanceJobConfig,
GlobalKeyForRepoMaintenanceJobCM)
}
if result.PodResources == nil && globalResult.PodResources != nil {
result.PodResources = globalResult.PodResources
}
if len(result.LoadAffinities) == 0 {
result.LoadAffinities = globalResult.LoadAffinities
}
}
return result, nil
}

View File

@@ -0,0 +1,523 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package maintenance
import (
"context"
"encoding/json"
"fmt"
"math"
"sort"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/util/kube"
appsv1 "k8s.io/api/apps/v1"
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
const (
RepositoryNameLabel = "velero.io/repo-name"
GlobalKeyForRepoMaintenanceJobCM = "global"
)
type JobConfigs struct {
// LoadAffinities is the config for repository maintenance job load affinity.
LoadAffinities []*kube.LoadAffinity `json:"loadAffinity,omitempty"`
// PodResources is the config for the CPU and memory resources setting.
PodResources *kube.PodResources `json:"podResources,omitempty"`
}
func GenerateJobName(repo string) string {
millisecond := time.Now().UTC().UnixMilli() // millisecond
jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond)
if len(jobName) > 63 { // k8s job name length limit
jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond)
}
return jobName
}
// DeleteOldJobs deletes old maintenance jobs and keeps the latest N jobs
func DeleteOldJobs(cli client.Client, repo string, keep int) error {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
if err != nil {
return err
}
// Delete old maintenance jobs
if len(jobList.Items) > keep {
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp)
})
for i := 0; i < len(jobList.Items)-keep; i++ {
err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return err
}
}
}
return nil
}
var waitCompletionBackOff = wait.Backoff{
Duration: time.Minute * 20,
Steps: math.MaxInt,
Factor: 2,
Cap: time.Hour * 12,
}
// waitForJobComplete wait for completion of the specified job and update the latest job object
func waitForJobComplete(ctx context.Context, client client.Client, ns string, job string, logger logrus.FieldLogger) (*batchv1.Job, error) {
var ret *batchv1.Job
backOff := waitCompletionBackOff
startTime := time.Now()
nextCheckpoint := startTime.Add(backOff.Step())
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
updated := &batchv1.Job{}
err := client.Get(ctx, types.NamespacedName{Namespace: ns, Name: job}, updated)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
ret = updated
if updated.Status.Succeeded > 0 {
return true, nil
}
if updated.Status.Failed > 0 {
return true, nil
}
now := time.Now()
if now.After(nextCheckpoint) {
logger.Warnf("Repo maintenance job %s has lasted %v minutes", job, now.Sub(startTime).Minutes())
nextCheckpoint = now.Add(backOff.Step())
}
return false, nil
})
return ret, err
}
func getResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
// Get the maintenance job related pod by label selector
podList := &v1.PodList{}
err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name}))
if err != nil {
return "", err
}
if len(podList.Items) == 0 {
return "", fmt.Errorf("no pod found for job %s", job.Name)
}
// we only have one maintenance pod for the job
pod := podList.Items[0]
statuses := pod.Status.ContainerStatuses
if len(statuses) == 0 {
return "", fmt.Errorf("no container statuses found for job %s", job.Name)
}
// we only have one maintenance container
terminated := statuses[0].State.Terminated
if terminated == nil {
return "", fmt.Errorf("container for job %s is not terminated", job.Name)
}
return terminated.Message, nil
}
// getJobConfig is called to get the Maintenance Job Config for the
// BackupRepository specified by the repo parameter.
//
// Params:
//
// ctx: the Go context used for controller-runtime client.
// client: the controller-runtime client.
// logger: the logger.
// veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository.
// repoMaintenanceJobConfig: the repository maintenance job ConfigMap name.
// repo: the BackupRepository needs to run the maintenance Job.
func getJobConfig(
ctx context.Context,
client client.Client,
logger logrus.FieldLogger,
veleroNamespace string,
repoMaintenanceJobConfig string,
repo *velerov1api.BackupRepository,
) (*JobConfigs, error) {
var cm v1.ConfigMap
if err := client.Get(
ctx,
types.NamespacedName{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
&cm,
); err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
} else {
return nil, errors.Wrapf(
err,
"fail to get repo maintenance job configs %s", repoMaintenanceJobConfig)
}
}
if cm.Data == nil {
return nil, errors.Errorf("data is not available in config map %s", repoMaintenanceJobConfig)
}
// Generate the BackupRepository key.
// If using the BackupRepository name as the is more intuitive,
// but the BackupRepository generation is dynamic. We cannot assume
// they are ready when installing Velero.
// Instead we use the volume source namespace, BSL name, and the uploader
// type to represent the BackupRepository. The combination of those three
// keys can identify a unique BackupRepository.
repoJobConfigKey := repo.Spec.VolumeNamespace + "-" +
repo.Spec.BackupStorageLocation + "-" + repo.Spec.RepositoryType
var result *JobConfigs
if _, ok := cm.Data[repoJobConfigKey]; ok {
logger.Debugf("Find the repo maintenance config %s for repo %s", repoJobConfigKey, repo.Name)
result = new(JobConfigs)
if err := json.Unmarshal([]byte(cm.Data[repoJobConfigKey]), result); err != nil {
return nil, errors.Wrapf(
err,
"fail to unmarshal configs from %s's key %s",
repoMaintenanceJobConfig,
repoJobConfigKey)
}
}
if _, ok := cm.Data[GlobalKeyForRepoMaintenanceJobCM]; ok {
logger.Debugf("Find the global repo maintenance config for repo %s", repo.Name)
if result == nil {
result = new(JobConfigs)
}
globalResult := new(JobConfigs)
if err := json.Unmarshal([]byte(cm.Data[GlobalKeyForRepoMaintenanceJobCM]), globalResult); err != nil {
return nil, errors.Wrapf(
err,
"fail to unmarshal configs from %s's key %s",
repoMaintenanceJobConfig,
GlobalKeyForRepoMaintenanceJobCM)
}
if result.PodResources == nil && globalResult.PodResources != nil {
result.PodResources = globalResult.PodResources
}
if len(result.LoadAffinities) == 0 {
result.LoadAffinities = globalResult.LoadAffinities
}
}
return result, nil
}
// WaitJobComplete waits the completion of the specified maintenance job and return the BackupRepositoryMaintenanceStatus
func WaitJobComplete(cli client.Client, ctx context.Context, jobName, ns string, logger logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
log := logger.WithField("job name", jobName)
maintenanceJob, err := waitForJobComplete(ctx, cli, ns, jobName, logger)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to wait for maintenance job complete")
}
log.Infof("Maintenance repo complete, succeeded %v, failed %v", maintenanceJob.Status.Succeeded, maintenanceJob.Status.Failed)
result := ""
if maintenanceJob.Status.Failed > 0 {
if r, err := getResultFromJob(cli, maintenanceJob); err != nil {
log.WithError(err).Warn("Failed to get maintenance job result")
result = "Repo maintenance failed but result is not retrieveable"
} else {
result = r
}
}
return composeStatusFromJob(maintenanceJob, result), nil
}
// WaitAllJobsComplete checks all the incomplete maintenance jobs of the specified repo and wait for them to complete,
// and then return the maintenance jobs' status in the range of limit
func WaitAllJobsComplete(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) {
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, &client.ListOptions{
Namespace: repo.Namespace,
},
client.MatchingLabels(map[string]string{RepositoryNameLabel: repo.Name}),
)
if err != nil {
return nil, errors.Wrapf(err, "error listing maintenance job for repo %s", repo.Name)
}
if len(jobList.Items) == 0 {
return nil, nil
}
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Time.Before(jobList.Items[j].CreationTimestamp.Time)
})
history := []velerov1api.BackupRepositoryMaintenanceStatus{}
startPos := len(jobList.Items) - limit
if startPos < 0 {
startPos = 0
}
for i := startPos; i < len(jobList.Items); i++ {
job := &jobList.Items[i]
if job.Status.Succeeded == 0 && job.Status.Failed == 0 {
log.Infof("Waiting for maintenance job %s to complete", job.Name)
updated, err := waitForJobComplete(ctx, cli, job.Namespace, job.Name, log)
if err != nil {
return nil, errors.Wrapf(err, "error waiting maintenance job[%s] complete", job.Name)
}
job = updated
}
message := ""
if job.Status.Failed > 0 {
if msg, err := getResultFromJob(cli, job); err != nil {
log.WithError(err).Warnf("Failed to get result of maintenance job %s", job.Name)
message = "Repo maintenance failed but result is not retrieveable"
} else {
message = msg
}
}
history = append(history, composeStatusFromJob(job, message))
}
return history, nil
}
// StartNewJob creates a new maintenance job
func StartNewJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, repoMaintenanceJobConfig string,
podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag, logger logrus.FieldLogger) (string, error) {
bsl := &velerov1api.BackupStorageLocation{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: repo.Namespace, Name: repo.Spec.BackupStorageLocation}, bsl); err != nil {
return "", errors.WithStack(err)
}
log := logger.WithFields(logrus.Fields{
"BSL name": bsl.Name,
"repo type": repo.Spec.RepositoryType,
"repo name": repo.Name,
"repo UID": repo.UID,
})
jobConfig, err := getJobConfig(
ctx,
cli,
log,
repo.Namespace,
repoMaintenanceJobConfig,
repo,
)
if err != nil {
log.Warnf("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.",
repo.Namespace+"/"+repoMaintenanceJobConfig,
err.Error(),
)
}
log.Info("Starting maintenance repo")
maintenanceJob, err := buildJob(cli, ctx, repo, bsl.Name, jobConfig, podResources, logLevel, logFormat)
if err != nil {
return "", errors.Wrap(err, "error to build maintenance job")
}
log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name))
if err := cli.Create(context.TODO(), maintenanceJob); err != nil {
return "", errors.Wrap(err, "error to create maintenance job")
}
log.Info("Repo maintenance job started")
return maintenanceJob.Name, nil
}
func buildJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, bslName string, config *JobConfigs,
podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag) (*batchv1.Job, error) {
// Get the Velero server deployment
deployment := &appsv1.Deployment{}
err := cli.Get(ctx, types.NamespacedName{Name: "velero", Namespace: repo.Namespace}, deployment)
if err != nil {
return nil, err
}
// Get the environment variables from the Velero server deployment
envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment)
// Get the referenced storage from the Velero server deployment
envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment)
// Get the volume mounts from the Velero server deployment
volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment)
// Get the volumes from the Velero server deployment
volumes := veleroutil.GetVolumesFromVeleroServer(deployment)
// Get the service account from the Velero server deployment
serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment)
// Get image
image := veleroutil.GetVeleroServerImage(deployment)
// Set resource limits and requests
cpuRequest := podResources.CPURequest
memRequest := podResources.MemoryRequest
cpuLimit := podResources.CPULimit
memLimit := podResources.MemoryLimit
if config != nil && config.PodResources != nil {
cpuRequest = config.PodResources.CPURequest
memRequest = config.PodResources.MemoryRequest
cpuLimit = config.PodResources.CPULimit
memLimit = config.PodResources.MemoryLimit
}
resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job")
}
// Set arguments
args := []string{"repo-maintenance"}
args = append(args, fmt.Sprintf("--repo-name=%s", repo.Spec.VolumeNamespace))
args = append(args, fmt.Sprintf("--repo-type=%s", repo.Spec.RepositoryType))
args = append(args, fmt.Sprintf("--backup-storage-location=%s", bslName))
args = append(args, fmt.Sprintf("--log-level=%s", logLevel.String()))
args = append(args, fmt.Sprintf("--log-format=%s", logFormat.String()))
// build the maintenance job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: GenerateJobName(repo.Name),
Namespace: repo.Namespace,
Labels: map[string]string{
RepositoryNameLabel: repo.Name,
},
},
Spec: batchv1.JobSpec{
BackoffLimit: new(int32), // Never retry
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "velero-repo-maintenance-pod",
Labels: map[string]string{
RepositoryNameLabel: repo.Name,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: image,
Command: []string{
"/velero",
},
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Env: envVars,
EnvFrom: envFromSources,
VolumeMounts: volumeMounts,
Resources: resources,
},
},
RestartPolicy: v1.RestartPolicyNever,
Volumes: volumes,
ServiceAccountName: serviceAccount,
},
},
},
}
if config != nil && len(config.LoadAffinities) > 0 {
affinity := kube.ToSystemAffinity(config.LoadAffinities)
job.Spec.Template.Spec.Affinity = affinity
}
if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil {
job.Spec.Template.Spec.Tolerations = tolerations
}
if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil {
job.Spec.Template.Spec.NodeSelector = nodeSelector
}
if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 {
job.Spec.Template.Labels = labels
}
if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 {
job.Spec.Template.Annotations = annotations
}
return job, nil
}
func composeStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus {
result := velerov1api.BackupRepositoryMaintenanceSucceeded
if job.Status.Failed > 0 {
result = velerov1api.BackupRepositoryMaintenanceFailed
}
return velerov1api.BackupRepositoryMaintenanceStatus{
Result: result,
StartTimestamp: &metav1.Time{Time: job.CreationTimestamp.Time},
CompleteTimestamp: &metav1.Time{Time: job.Status.CompletionTime.Time},
Message: message,
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,460 +0,0 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repository
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
func TestGenerateJobName1(t *testing.T) {
testCases := []struct {
repo string
expectedStart string
}{
{
repo: "example",
expectedStart: "example-maintain-job-",
},
{
repo: strings.Repeat("a", 60),
expectedStart: "repo-maintain-job-",
},
}
for _, tc := range testCases {
t.Run(tc.repo, func(t *testing.T) {
// Call the function to test
jobName := GenerateJobName(tc.repo)
// Check if the generated job name starts with the expected prefix
if !strings.HasPrefix(jobName, tc.expectedStart) {
t.Errorf("generated job name does not start with expected prefix")
}
// Check if the length of the generated job name exceeds the Kubernetes limit
if len(jobName) > 63 {
t.Errorf("generated job name exceeds Kubernetes limit")
}
})
}
}
func TestDeleteOldMaintenanceJobs(t *testing.T) {
// Set up test repo and keep value
repo := "test-repo"
keep := 2
// Create some maintenance jobs for testing
var objs []client.Object
// Create a newer job
newerJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, newerJob)
// Create older jobs
for i := 2; i <= 3; i++ {
olderJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("job%d", i),
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
CreationTimestamp: metav1.Time{
Time: metav1.Now().Add(time.Duration(-24*i) * time.Hour),
},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, olderJob)
}
// Create a fake Kubernetes client
scheme := runtime.NewScheme()
_ = batchv1.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
// Call the function
err := DeleteOldMaintenanceJobs(cli, repo, keep)
assert.NoError(t, err)
// Get the remaining jobs
jobList := &batchv1.JobList{}
err = cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
assert.NoError(t, err)
// We expect the number of jobs to be equal to 'keep'
assert.Len(t, jobList.Items, keep)
// We expect that the oldest jobs were deleted
// Job3 should not be present in the remaining list
assert.NotContains(t, jobList.Items, objs[2])
// Job2 should also not be present in the remaining list
assert.NotContains(t, jobList.Items, objs[1])
}
func TestWaitForJobComplete(t *testing.T) {
// Set up test job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Status: batchv1.JobStatus{},
}
// Define test cases
tests := []struct {
description string // Test case description
jobStatus batchv1.JobStatus // Job status to set for the test
expectError bool // Whether an error is expected
}{
{
description: "Job Succeeded",
jobStatus: batchv1.JobStatus{
Succeeded: 1,
},
expectError: false,
},
{
description: "Job Failed",
jobStatus: batchv1.JobStatus{
Failed: 1,
},
expectError: true,
},
}
// Run tests
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
// Set the job status
job.Status = tc.jobStatus
// Create a fake Kubernetes client
cli := fake.NewClientBuilder().WithObjects(job).Build()
// Call the function
err := WaitForJobComplete(context.Background(), cli, job)
// Check if the error matches the expectation
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
func TestGetMaintenanceResultFromJob(t *testing.T) {
// Set up test job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
}
// Set up test pod with no status
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
Labels: map[string]string{"job-name": job.Name},
},
}
// Create a fake Kubernetes client
cli := fake.NewClientBuilder().WithObjects(job, pod).Build()
// test an error should be returned
result, err := GetMaintenanceResultFromJob(cli, job)
assert.Error(t, err)
assert.Equal(t, "", result)
// Set a non-terminated container status to the pod
pod.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{},
},
},
}
// Test an error should be returned
cli = fake.NewClientBuilder().WithObjects(job, pod).Build()
result, err = GetMaintenanceResultFromJob(cli, job)
assert.Error(t, err)
assert.Equal(t, "", result)
// Set a terminated container status to the pod
pod.Status = v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Message: "test message",
},
},
},
},
}
// This call should return the termination message with no error
cli = fake.NewClientBuilder().WithObjects(job, pod).Build()
result, err = GetMaintenanceResultFromJob(cli, job)
assert.NoError(t, err)
assert.Equal(t, "test message", result)
}
func TestGetLatestMaintenanceJob(t *testing.T) {
// Set up test repo
repo := "test-repo"
// Create some maintenance jobs for testing
var objs []client.Object
// Create a newer job
newerJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
CreationTimestamp: metav1.Time{
Time: metav1.Now().Add(time.Duration(-24) * time.Hour),
},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, newerJob)
// Create an older job
olderJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job2",
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, olderJob)
// Create a fake Kubernetes client
scheme := runtime.NewScheme()
_ = batchv1.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
// Call the function
job, err := GetLatestMaintenanceJob(cli, "default")
assert.NoError(t, err)
// We expect the returned job to be the newer job
assert.Equal(t, newerJob.Name, job.Name)
}
func TestGetMaintenanceJobConfig(t *testing.T) {
ctx := context.Background()
logger := logrus.New()
veleroNamespace := "velero"
repoMaintenanceJobConfig := "repo-maintenance-job-config"
repo := &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
Spec: velerov1api.BackupRepositorySpec{
BackupStorageLocation: "default",
RepositoryType: "kopia",
VolumeNamespace: "test",
},
}
testCases := []struct {
name string
repoJobConfig *v1.ConfigMap
expectedConfig *JobConfigs
expectedError error
}{
{
name: "Config not exist",
expectedConfig: nil,
expectedError: nil,
},
{
name: "Invalid JSON",
repoJobConfig: &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
Data: map[string]string{
"test-default-kopia": "{\"cpuRequest:\"100m\"}",
},
},
expectedConfig: nil,
expectedError: fmt.Errorf("fail to unmarshal configs from %s", repoMaintenanceJobConfig),
},
{
name: "Find config specific for BackupRepository",
repoJobConfig: &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
Data: map[string]string{
"test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}",
},
},
expectedConfig: &JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
CPULimit: "200m",
MemoryRequest: "100Mi",
MemoryLimit: "200Mi",
},
LoadAffinities: []*kube.LoadAffinity{
{
NodeSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "cloud.google.com/machine-family",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"e2"},
},
},
},
},
},
},
expectedError: nil,
},
{
name: "Find config specific for global",
repoJobConfig: &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
Data: map[string]string{
GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}",
},
},
expectedConfig: &JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "50m",
CPULimit: "100m",
MemoryRequest: "50Mi",
MemoryLimit: "100Mi",
},
LoadAffinities: []*kube.LoadAffinity{
{
NodeSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "cloud.google.com/machine-family",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"n2"},
},
},
},
},
},
},
expectedError: nil,
},
{
name: "Specific config supersede global config",
repoJobConfig: &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: veleroNamespace,
Name: repoMaintenanceJobConfig,
},
Data: map[string]string{
GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}",
"test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}",
},
},
expectedConfig: &JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
CPULimit: "200m",
MemoryRequest: "100Mi",
MemoryLimit: "200Mi",
},
LoadAffinities: []*kube.LoadAffinity{
{
NodeSelector: metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "cloud.google.com/machine-family",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"e2"},
},
},
},
},
},
},
expectedError: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var fakeClient client.Client
if tc.repoJobConfig != nil {
fakeClient = velerotest.NewFakeControllerRuntimeClient(t, tc.repoJobConfig)
} else {
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
}
jobConfig, err := GetMaintenanceJobConfig(
ctx,
fakeClient,
logger,
veleroNamespace,
repoMaintenanceJobConfig,
repo,
)
if tc.expectedError != nil {
require.ErrorContains(t, err, tc.expectedError.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedConfig, jobConfig)
})
}
}

View File

@@ -23,11 +23,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
@@ -35,9 +30,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
)
// Manager manages backup repositories.
@@ -76,16 +68,10 @@ type manager struct {
providers map[string]provider.Provider
// client is the Velero controller manager's client.
// It's limited to resources in the Velero namespace.
client client.Client
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
fileSystem filesystem.Interface
repoMaintenanceJobConfig string
podResources kube.PodResources
keepLatestMaintenanceJobs int
log logrus.FieldLogger
logLevel logrus.Level
logFormat *logging.FormatFlag
client client.Client
repoLocker *repository.RepoLocker
fileSystem filesystem.Interface
log logrus.FieldLogger
}
// NewManager create a new repository manager.
@@ -93,29 +79,17 @@ func NewManager(
namespace string,
client client.Client,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
credentialFileStore credentials.FileStore,
credentialSecretStore credentials.SecretStore,
repoMaintenanceJobConfig string,
podResources kube.PodResources,
keepLatestMaintenanceJobs int,
log logrus.FieldLogger,
logLevel logrus.Level,
logFormat *logging.FormatFlag,
) Manager {
mgr := &manager{
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
fileSystem: filesystem.NewFileSystem(),
repoMaintenanceJobConfig: repoMaintenanceJobConfig,
podResources: podResources,
keepLatestMaintenanceJobs: keepLatestMaintenanceJobs,
log: log,
logLevel: logLevel,
logFormat: logFormat,
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
fileSystem: filesystem.NewFileSystem(),
log: log,
}
mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log)
@@ -176,91 +150,20 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.LockExclusive(repo.Name)
defer m.repoLocker.UnlockExclusive(repo.Name)
prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return errors.WithStack(err)
}
log := m.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"repo type": param.BackupRepo.Spec.RepositoryType,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
job, err := repository.GetLatestMaintenanceJob(m.client, m.namespace)
if err != nil {
if err := prd.BoostRepoConnect(context.Background(), param); err != nil {
return errors.WithStack(err)
}
if job != nil && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
log.Debugf("There already has a unfinished maintenance job %s/%s for repository %s, please wait for it to complete", job.Namespace, job.Name, param.BackupRepo.Name)
return nil
}
jobConfig, err := repository.GetMaintenanceJobConfig(
context.Background(),
m.client,
m.log,
m.namespace,
m.repoMaintenanceJobConfig,
repo,
)
if err != nil {
log.Infof("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.",
m.namespace+"/"+m.repoMaintenanceJobConfig,
err.Error(),
)
}
log.Info("Start to maintenance repo")
maintenanceJob, err := m.buildMaintenanceJob(
jobConfig,
param,
)
if err != nil {
return errors.Wrap(err, "error to build maintenance job")
}
log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name))
if err := m.client.Create(context.TODO(), maintenanceJob); err != nil {
return errors.Wrap(err, "error to create maintenance job")
}
log.Debug("Creating maintenance job")
defer func() {
if err := repository.DeleteOldMaintenanceJobs(
m.client,
param.BackupRepo.Name,
m.keepLatestMaintenanceJobs,
); err != nil {
log.WithError(err).Error("Failed to delete maintenance job")
}
}()
var jobErr error
if err := repository.WaitForJobComplete(context.TODO(), m.client, maintenanceJob); err != nil {
log.WithError(err).Error("Error to wait for maintenance job complete")
jobErr = err // we won't return here for job may failed by maintenance failure, we want return the actual error
}
result, err := repository.GetMaintenanceResultFromJob(m.client, maintenanceJob)
if err != nil {
return errors.Wrap(err, "error to get maintenance job result")
}
if result != "" {
return errors.New(fmt.Sprintf("Maintenance job %s failed: %s", maintenanceJob.Name, result))
}
if jobErr != nil {
return errors.Wrap(jobErr, "error to wait for maintenance job complete")
}
log.Info("Maintenance repo complete")
return nil
return prd.PruneRepo(context.Background(), param)
}
func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error {
@@ -353,122 +256,3 @@ func (m *manager) assembleRepoParam(repo *velerov1api.BackupRepository) (provide
BackupRepo: repo,
}, nil
}
func (m *manager) buildMaintenanceJob(
config *repository.JobConfigs,
param provider.RepoParam,
) (*batchv1.Job, error) {
// Get the Velero server deployment
deployment := &appsv1.Deployment{}
err := m.client.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: m.namespace}, deployment)
if err != nil {
return nil, err
}
// Get the environment variables from the Velero server deployment
envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment)
// Get the referenced storage from the Velero server deployment
envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment)
// Get the volume mounts from the Velero server deployment
volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment)
// Get the volumes from the Velero server deployment
volumes := veleroutil.GetVolumesFromVeleroServer(deployment)
// Get the service account from the Velero server deployment
serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment)
// Get image
image := veleroutil.GetVeleroServerImage(deployment)
// Set resource limits and requests
cpuRequest := m.podResources.CPURequest
memRequest := m.podResources.MemoryRequest
cpuLimit := m.podResources.CPULimit
memLimit := m.podResources.MemoryLimit
if config != nil && config.PodResources != nil {
cpuRequest = config.PodResources.CPURequest
memRequest = config.PodResources.MemoryRequest
cpuLimit = config.PodResources.CPULimit
memLimit = config.PodResources.MemoryLimit
}
resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job")
}
// Set arguments
args := []string{"repo-maintenance"}
args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace))
args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType))
args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name))
args = append(args, fmt.Sprintf("--log-level=%s", m.logLevel.String()))
args = append(args, fmt.Sprintf("--log-format=%s", m.logFormat.String()))
// build the maintenance job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: repository.GenerateJobName(param.BackupRepo.Name),
Namespace: param.BackupRepo.Namespace,
Labels: map[string]string{
repository.RepositoryNameLabel: param.BackupRepo.Name,
},
},
Spec: batchv1.JobSpec{
BackoffLimit: new(int32), // Never retry
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "velero-repo-maintenance-pod",
Labels: map[string]string{
repository.RepositoryNameLabel: param.BackupRepo.Name,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: image,
Command: []string{
"/velero",
},
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Env: envVars,
EnvFrom: envFromSources,
VolumeMounts: volumeMounts,
Resources: resources,
},
},
RestartPolicy: v1.RestartPolicyNever,
Volumes: volumes,
ServiceAccountName: serviceAccount,
},
},
},
}
if config != nil && len(config.LoadAffinities) > 0 {
affinity := kube.ToSystemAffinity(config.LoadAffinities)
job.Spec.Template.Spec.Affinity = affinity
}
if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil {
job.Spec.Template.Spec.Tolerations = tolerations
}
if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil {
job.Spec.Template.Spec.NodeSelector = nodeSelector
}
if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 {
job.Spec.Template.Labels = labels
}
if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 {
job.Spec.Template.Annotations = annotations
}
return job, nil
}

View File

@@ -17,31 +17,18 @@ limitations under the License.
package repository
import (
"fmt"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
func TestGetRepositoryProvider(t *testing.T) {
var fakeClient kbclient.Client
mgr := NewManager("", fakeClient, nil, nil, nil, nil, "", kube.PodResources{}, 3, nil, logrus.InfoLevel, nil).(*manager)
mgr := NewManager("", fakeClient, nil, nil, nil, nil).(*manager)
repo := &velerov1.BackupRepository{}
// empty repository type
@@ -60,220 +47,3 @@ func TestGetRepositoryProvider(t *testing.T) {
_, err = mgr.getRepositoryProvider(repo)
require.Error(t, err)
}
func TestBuildMaintenanceJob(t *testing.T) {
testCases := []struct {
name string
m *repository.JobConfigs
deploy *appsv1.Deployment
logLevel logrus.Level
logFormat *logging.FormatFlag
expectedJobName string
expectedError bool
expectedEnv []v1.EnvVar
expectedEnvFrom []v1.EnvFromSource
}{
{
name: "Valid maintenance job",
m: &repository.JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
MemoryRequest: "128Mi",
CPULimit: "200m",
MemoryLimit: "256Mi",
},
},
deploy: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "velero",
Namespace: "velero",
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: "velero-image",
Env: []v1.EnvVar{
{
Name: "test-name",
Value: "test-value",
},
},
EnvFrom: []v1.EnvFromSource{
{
ConfigMapRef: &v1.ConfigMapEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-configmap",
},
},
},
{
SecretRef: &v1.SecretEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-secret",
},
},
},
},
},
},
},
},
},
},
logLevel: logrus.InfoLevel,
logFormat: logging.NewFormatFlag(),
expectedJobName: "test-123-maintain-job",
expectedError: false,
expectedEnv: []v1.EnvVar{
{
Name: "test-name",
Value: "test-value",
},
},
expectedEnvFrom: []v1.EnvFromSource{
{
ConfigMapRef: &v1.ConfigMapEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-configmap",
},
},
},
{
SecretRef: &v1.SecretEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-secret",
},
},
},
},
},
{
name: "Error getting Velero server deployment",
m: &repository.JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
MemoryRequest: "128Mi",
CPULimit: "200m",
MemoryLimit: "256Mi",
},
},
logLevel: logrus.InfoLevel,
logFormat: logging.NewFormatFlag(),
expectedJobName: "",
expectedError: true,
},
}
param := provider.RepoParam{
BackupRepo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-123",
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: "test-123",
RepositoryType: "kopia",
},
},
BackupLocation: &velerov1api.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-location",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a fake clientset with resources
objs := []runtime.Object{param.BackupLocation, param.BackupRepo}
if tc.deploy != nil {
objs = append(objs, tc.deploy)
}
scheme := runtime.NewScheme()
_ = appsv1.AddToScheme(scheme)
_ = velerov1api.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build()
mgr := NewManager(
"velero",
cli,
nil,
nil,
nil,
nil,
"",
kube.PodResources{},
3,
nil,
logrus.InfoLevel,
logging.NewFormatFlag(),
).(*manager)
// Call the function to test
job, err := mgr.buildMaintenanceJob(tc.m, param)
// Check the error
if tc.expectedError {
assert.Error(t, err)
assert.Nil(t, job)
} else {
assert.NoError(t, err)
assert.NotNil(t, job)
assert.Contains(t, job.Name, tc.expectedJobName)
assert.Equal(t, param.BackupRepo.Namespace, job.Namespace)
assert.Equal(t, param.BackupRepo.Name, job.Labels[repository.RepositoryNameLabel])
assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[repository.RepositoryNameLabel])
// Check container
assert.Len(t, job.Spec.Template.Spec.Containers, 1)
container := job.Spec.Template.Spec.Containers[0]
assert.Equal(t, "velero-repo-maintenance-container", container.Name)
assert.Equal(t, "velero-image", container.Image)
assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy)
// Check container env
assert.Equal(t, tc.expectedEnv, container.Env)
assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom)
// Check resources
expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest),
v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit),
v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit),
},
}
assert.Equal(t, expectedResources, container.Resources)
// Check args
expectedArgs := []string{
"repo-maintenance",
fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace),
fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType),
fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name),
fmt.Sprintf("--log-level=%s", tc.logLevel.String()),
fmt.Sprintf("--log-format=%s", tc.logFormat.String()),
}
assert.Equal(t, expectedArgs, container.Args)
// Check affinity
assert.Nil(t, job.Spec.Template.Spec.Affinity)
// Check tolerations
assert.Nil(t, job.Spec.Template.Spec.Tolerations)
// Check node selector
assert.Nil(t, job.Spec.Template.Spec.NodeSelector)
}
})
}
}

View File

@@ -62,3 +62,19 @@ func NewSingleLoggerWithHooks(buffer *string, hooks []logrus.Hook) logrus.FieldL
return logrus.NewEntry(logger)
}
type multipleLogRecorder struct {
buffer *[]string
}
func (m *multipleLogRecorder) Write(p []byte) (n int, err error) {
*m.buffer = append(*m.buffer, string(p[:]))
return len(p), nil
}
func NewMultipleLogger(buffer *[]string) logrus.FieldLogger {
logger := logrus.New()
logger.Out = &multipleLogRecorder{buffer}
logger.Level = logrus.TraceLevel
return logrus.NewEntry(logger)
}