repo maintenance job out of repo manager

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2025-01-06 16:25:33 +08:00
parent 912b116bdb
commit db69829fd7
9 changed files with 781 additions and 586 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -12,21 +13,25 @@ import (
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerocli "github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
repokey "github.com/vmware-tanzu/velero/pkg/repository/keys"
repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager"
)
type Options struct {
RepoName string
BackupStorageLocation string
RepoType string
ResourceTimeout time.Duration
LogLevelFlag *logging.LevelFlag
FormatFlag *logging.FormatFlag
}
@@ -83,39 +88,46 @@ func (o *Options) Run(f velerocli.Factory) {
}
}
func (o *Options) initClient(f velerocli.Factory) (client.Client, error) {
func (o *Options) initClient(f velerocli.Factory) (client.Client, kubernetes.Interface, error) {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
if err != nil {
return nil, errors.Wrap(err, "failed to add velero scheme")
return nil, nil, errors.Wrap(err, "failed to add velero scheme")
}
err = v1.AddToScheme(scheme)
if err != nil {
return nil, errors.Wrap(err, "failed to add api core scheme")
return nil, nil, errors.Wrap(err, "failed to add api core scheme")
}
config, err := f.ClientConfig()
if err != nil {
return nil, errors.Wrap(err, "failed to get client config")
return nil, nil, errors.Wrap(err, "failed to get client config")
}
cli, err := client.New(config, client.Options{
Scheme: scheme,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create client")
return nil, nil, errors.Wrap(err, "failed to create client")
}
return cli, nil
kubeClient, err := f.KubeClient()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create kube client")
}
return cli, kubeClient, nil
}
func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error {
cli, err := o.initClient(f)
if err != nil {
return err
func initRepoManager(namespace string, kubeClient kubernetes.Interface, cli client.Client, logger logrus.FieldLogger) (repomanager.Manager, error) {
// ensure the repo key secret is set up
if err := repokey.EnsureCommonRepositoryKey(kubeClient.CoreV1(), namespace); err != nil {
return nil, errors.Wrap(err, "failed to ensure repository key")
}
repoLocker := repository.NewRepoLocker()
credentialFileStore, err := credentials.NewNamespacedFileStore(
cli,
namespace,
@@ -123,23 +135,33 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log
filesystem.NewFileSystem(),
)
if err != nil {
return errors.Wrap(err, "failed to create namespaced file store")
return nil, errors.Wrap(err, "failed to create namespaced file store")
}
credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, namespace)
if err != nil {
return errors.Wrap(err, "failed to create namespaced secret store")
return nil, errors.Wrap(err, "failed to create namespaced secret store")
}
var repoProvider provider.Provider
if o.RepoType == velerov1api.BackupRepositoryTypeRestic {
repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger)
} else {
repoProvider = provider.NewUnifiedRepoProvider(
credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, o.RepoType, logger)
return repomanager.NewManager(
namespace,
cli,
repoLocker,
credentialFileStore,
credentialSecretStore,
logger,
), nil
}
func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error {
cli, kubeClient, err := o.initClient(f)
if err != nil {
return err
}
manager, err := initRepoManager(namespace, kubeClient, cli, logger)
if err != nil {
return err
}
// backupRepository
@@ -149,31 +171,14 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log
BackupLocation: o.BackupStorageLocation,
RepositoryType: o.RepoType,
}, true)
if err != nil {
return errors.Wrap(err, "failed to get backup repository")
}
// bsl
bsl := &velerov1api.BackupStorageLocation{}
err = cli.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: repo.Spec.BackupStorageLocation}, bsl)
if err != nil {
return errors.Wrap(err, "failed to get backup storage location")
}
para := provider.RepoParam{
BackupRepo: repo,
BackupLocation: bsl,
}
err = repoProvider.BoostRepoConnect(context.Background(), para)
if err != nil {
return errors.Wrap(err, "failed to boost repo connect")
}
err = repoProvider.PruneRepo(context.Background(), para)
err = manager.PruneRepo(repo)
if err != nil {
return errors.Wrap(err, "failed to prune repo")
}
return nil
}

View File

@@ -491,15 +491,9 @@ func (s *server) initRepoManager() error {
s.namespace,
s.mgr.GetClient(),
s.repoLocker,
s.repoEnsurer,
s.credentialFileStore,
s.credentialSecretStore,
s.config.RepoMaintenanceJobConfig,
s.config.PodResources,
s.config.KeepLatestMaintenanceJobs,
s.logger,
s.logLevel,
s.config.LogFormat,
)
return nil
@@ -720,9 +714,14 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.namespace,
s.logger,
s.mgr.GetClient(),
s.repoManager,
s.config.RepoMaintenanceFrequency,
s.config.BackupRepoConfig,
s.repoManager,
s.config.KeepLatestMaintenanceJobs,
s.config.RepoMaintenanceJobConfig,
s.config.PodResources,
s.logLevel,
s.config.LogFormat,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupRepo)
}

View File

@@ -45,6 +45,7 @@ import (
repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config"
repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
const (
@@ -55,16 +56,22 @@ const (
type BackupRepoReconciler struct {
client.Client
namespace string
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
maintenanceFrequency time.Duration
backupRepoConfig string
repositoryManager repomanager.Manager
namespace string
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
maintenanceFrequency time.Duration
backupRepoConfig string
repositoryManager repomanager.Manager
keepLatestMaintenanceJobs int
repoMaintenanceConfig string
podResources kube.PodResources
logLevel logrus.Level
logFormat *logging.FormatFlag
}
func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client,
maintenanceFrequency time.Duration, backupRepoConfig string, repositoryManager repomanager.Manager) *BackupRepoReconciler {
func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, repositoryManager repomanager.Manager,
maintenanceFrequency time.Duration, backupRepoConfig string, keepLatestMaintenanceJobs int, repoMaintenanceConfig string, podResources kube.PodResources,
logLevel logrus.Level, logFormat *logging.FormatFlag) *BackupRepoReconciler {
c := &BackupRepoReconciler{
client,
namespace,
@@ -73,6 +80,11 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client
maintenanceFrequency,
backupRepoConfig,
repositoryManager,
keepLatestMaintenanceJobs,
repoMaintenanceConfig,
podResources,
logLevel,
logFormat,
}
return c
@@ -212,7 +224,13 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, errors.Wrap(err, "error handling incomplete repo maintenance jobs")
}
return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log)
if err := r.runMaintenanceIfDue(ctx, backupRepo, log); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error check and run repo maintenance jobs")
}
if err := repository.DeleteOldMaintenanceJobs(r.Client, req.Name, r.keepLatestMaintenanceJobs); err != nil {
log.WithError(err).Warn("Failed to delete old maintenance jobs")
}
}
return ctrl.Result{}, nil
@@ -306,7 +324,7 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repomanager.Mana
}
func (r *BackupRepoReconciler) recallMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
history, err := repository.WaitIncompleteMaintenance(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log)
history, err := repository.WaitAllMaintenanceJobComplete(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log)
if err != nil {
return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name)
}
@@ -380,7 +398,11 @@ func getLastMaintenanceTimeFromHistory(history []velerov1api.BackupRepositoryMai
time := history[0].CompleteTimestamp
for i := range history {
if time.Before(history[i].CompleteTimestamp) {
if history[i].CompleteTimestamp == nil {
continue
}
if time == nil || time.Before(history[i].CompleteTimestamp) {
time = history[i].CompleteTimestamp
}
}
@@ -406,8 +428,13 @@ func isEarlierMaintenanceStatus(a, b velerov1api.BackupRepositoryMaintenanceStat
return a.StartTimestamp.Before(b.StartTimestamp)
}
var funcStartMaintenanceJob = repository.StartMaintenanceJob
var funcWaitMaintenanceJobComplete = repository.WaitMaintenanceJobComplete
func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
if !dueForMaintenance(req, r.clock.Now()) {
startTime := r.clock.Now()
if !dueForMaintenance(req, startTime) {
log.Debug("not due for maintenance")
return nil
}
@@ -418,31 +445,39 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel
// should not cause the repo to move to `NotReady`.
log.Debug("Pruning repo")
// when PruneRepo fails, the maintenance result will be left temporarily
// If the maintenenance still completes later, recallMaintenance recalls the left onces and update LastMaintenanceTime and history
status, err := r.repositoryManager.PruneRepo(req)
job, err := funcStartMaintenanceJob(r.Client, ctx, req, r.repoMaintenanceConfig, r.podResources, r.logLevel, r.logFormat, log)
if err != nil {
return errors.Wrapf(err, "error pruning repository")
log.WithError(err).Warn("Starting repo maintenance failed")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, &metav1.Time{Time: startTime}, nil, fmt.Sprintf("Failed to start maintenance job, err: %v", err))
})
}
// when WaitMaintenanceJobComplete fails, the maintenance result will be left temporarily
// If the maintenenance still completes later, recallMaintenance recalls the left onces and update LastMaintenanceTime and history
status, err := funcWaitMaintenanceJobComplete(r.Client, ctx, job, r.namespace, log)
if err != nil {
return errors.Wrapf(err, "error waiting repo maintenance completion status")
}
if status.Result == velerov1api.BackupRepositoryMaintenanceFailed {
log.WithError(err).Warn("Pruning repository failed")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp.Time, status.CompleteTimestamp.Time, status.Message)
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp, status.CompleteTimestamp, status.Message)
})
}
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
rr.Status.LastMaintenanceTime = &metav1.Time{Time: status.CompleteTimestamp.Time}
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp.Time, status.CompleteTimestamp.Time, status.Message)
updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp, status.CompleteTimestamp, status.Message)
})
}
func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime time.Time, completionTime time.Time, message string) {
func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime, completionTime *metav1.Time, message string) {
latest := velerov1api.BackupRepositoryMaintenanceStatus{
Result: result,
StartTimestamp: &metav1.Time{Time: startTime},
CompleteTimestamp: &metav1.Time{Time: completionTime},
StartTimestamp: startTime,
CompleteTimestamp: completionTime,
Message: message,
}

View File

@@ -19,20 +19,30 @@ import (
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/repository"
repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks"
repotypes "github.com/vmware-tanzu/velero/pkg/repository/types"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
batchv1 "k8s.io/api/batch/v1"
)
const testMaintenanceFrequency = 10 * time.Minute
@@ -47,9 +57,14 @@ func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
mgr,
testMaintenanceFrequency,
"fake-repo-config",
mgr,
3,
"",
kube.PodResources{},
logrus.InfoLevel,
nil,
)
}
@@ -104,24 +119,71 @@ func TestCheckNotReadyRepo(t *testing.T) {
assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier)
}
func startMaintenanceJobFail(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) {
return "", errors.New("fake-start-error")
}
func startMaintenanceJobSucceed(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) {
return "fake-job-name", nil
}
func waitMaintenanceJobCompleteFail(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.New("fake-wait-error")
}
func waitMaintenanceJobCompleteSucceed(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
return velerov1api.BackupRepositoryMaintenanceStatus{
StartTimestamp: &metav1.Time{Time: time.Now()},
CompleteTimestamp: &metav1.Time{Time: time.Now().Add(time.Hour)},
}, nil
}
func TestRunMaintenanceIfDue(t *testing.T) {
rr := mockBackupRepositoryCR()
reconciler := mockBackupRepoReconciler(t, "PruneRepo", rr, velerov1api.BackupRepositoryMaintenanceStatus{
StartTimestamp: &metav1.Time{},
CompleteTimestamp: &metav1.Time{},
}, nil)
reconciler := mockBackupRepoReconciler(t, "", rr, nil)
funcStartMaintenanceJob = startMaintenanceJobFail
err := reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
lastTm := rr.Status.LastMaintenanceTime
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.NotEqual(t, rr.Status.LastMaintenanceTime, lastTm)
rr.Status.LastMaintenanceTime = &metav1.Time{Time: time.Now()}
lastTm = rr.Status.LastMaintenanceTime
history := rr.Status.RecentMaintenance
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm)
assert.NotEqual(t, rr.Status.RecentMaintenance, history)
rr = mockBackupRepositoryCR()
reconciler = mockBackupRepoReconciler(t, "", rr, nil)
funcStartMaintenanceJob = startMaintenanceJobSucceed
funcWaitMaintenanceJobComplete = waitMaintenanceJobCompleteFail
err = reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
lastTm = rr.Status.LastMaintenanceTime
history = rr.Status.RecentMaintenance
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.EqualError(t, err, "error waiting repo maintenance completion status: fake-wait-error")
assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm)
assert.Equal(t, rr.Status.RecentMaintenance, history)
rr = mockBackupRepositoryCR()
reconciler = mockBackupRepoReconciler(t, "", rr, nil)
funcStartMaintenanceJob = startMaintenanceJobSucceed
funcWaitMaintenanceJobComplete = waitMaintenanceJobCompleteSucceed
err = reconciler.Client.Create(context.TODO(), rr)
assert.NoError(t, err)
lastTm = rr.Status.LastMaintenanceTime
history = rr.Status.RecentMaintenance
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.NotEqual(t, rr.Status.LastMaintenanceTime, lastTm)
assert.NotEqual(t, rr.Status.RecentMaintenance, history)
rr.Status.LastMaintenanceTime = &metav1.Time{Time: time.Now()}
lastTm = rr.Status.LastMaintenanceTime
history = rr.Status.RecentMaintenance
err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm)
assert.Equal(t, rr.Status.RecentMaintenance, history)
}
func TestInitializeRepo(t *testing.T) {
@@ -251,9 +313,14 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) {
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
&mgr,
test.userDefinedFreq,
"",
&mgr,
3,
"",
kube.PodResources{},
logrus.InfoLevel,
nil,
)
freq := reconciler.getRepositoryMaintenanceFrequency(test.repo)
@@ -380,7 +447,14 @@ func TestNeedInvalidBackupRepo(t *testing.T) {
velerov1api.DefaultNamespace,
velerotest.NewLogger(),
velerotest.NewFakeControllerRuntimeClient(t),
time.Duration(0), "", nil)
nil,
time.Duration(0),
"",
3,
"",
kube.PodResources{},
logrus.InfoLevel,
nil)
need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL)
assert.Equal(t, test.expect, need)
@@ -656,7 +730,7 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
updateRepoMaintenanceHistory(test.backupRepo, test.result, standardTime, standardTime.Add(time.Hour), "fake-message-0")
updateRepoMaintenanceHistory(test.backupRepo, test.result, &metav1.Time{Time: standardTime}, &metav1.Time{Time: standardTime.Add(time.Hour)}, "fake-message-0")
for at := range test.backupRepo.Status.RecentMaintenance {
assert.Equal(t, test.expectedHistory[at].StartTimestamp.Time, test.backupRepo.Status.RecentMaintenance[at].StartTimestamp.Time)
@@ -666,3 +740,109 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) {
})
}
}
func TestRecallMaintenance(t *testing.T) {
now := time.Now()
schemeFail := runtime.NewScheme()
velerov1api.AddToScheme(schemeFail)
scheme := runtime.NewScheme()
batchv1.AddToScheme(scheme)
corev1.AddToScheme(scheme)
velerov1api.AddToScheme(scheme)
jobSucceeded := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: velerov1api.DefaultNamespace,
Labels: map[string]string{repository.RepositoryNameLabel: "repo"},
CreationTimestamp: metav1.Time{Time: now.Add(time.Hour)},
},
Status: batchv1.JobStatus{
StartTime: &metav1.Time{Time: now.Add(time.Hour)},
CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 2)},
Succeeded: 1,
},
}
jobPodSucceeded := builder.ForPod(velerov1api.DefaultNamespace, "job1").Labels(map[string]string{"job-name": "job1"}).ContainerStatuses(&v1.ContainerStatus{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
}).Result()
tests := []struct {
name string
kubeClientObj []runtime.Object
runtimeScheme *runtime.Scheme
repoLastMatainTime metav1.Time
expectNewHistory bool
expectTimeUpdate bool
expectedErr string
}{
{
name: "wait completion error",
runtimeScheme: schemeFail,
expectedErr: "error waiting incomplete repo maintenance job for repo repo: error listing maintenance job for repo repo: no kind is registered for the type v1.JobList in scheme \"pkg/runtime/scheme.go:100\"",
},
{
name: "no consolidate result",
runtimeScheme: scheme,
},
{
name: "no update last time",
runtimeScheme: scheme,
kubeClientObj: []runtime.Object{
jobSucceeded,
jobPodSucceeded,
},
repoLastMatainTime: metav1.Time{Time: now.Add(time.Hour * 5)},
expectNewHistory: true,
},
{
name: "update last time",
runtimeScheme: scheme,
kubeClientObj: []runtime.Object{
jobSucceeded,
jobPodSucceeded,
},
expectNewHistory: true,
expectTimeUpdate: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := mockBackupRepoReconciler(t, "", nil, nil)
backupRepo := mockBackupRepositoryCR()
backupRepo.Status.LastMaintenanceTime = &test.repoLastMatainTime
test.kubeClientObj = append(test.kubeClientObj, backupRepo)
fakeClientBuilder := fake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
r.Client = fakeClient
lastTm := backupRepo.Status.LastMaintenanceTime
history := backupRepo.Status.RecentMaintenance
err := r.recallMaintenance(context.TODO(), backupRepo, velerotest.NewLogger())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)
} else {
assert.NoError(t, err)
if test.expectNewHistory {
assert.NotEqual(t, history, backupRepo.Status.RecentMaintenance)
}
if test.expectTimeUpdate {
assert.NotEqual(t, lastTm, backupRepo.Status.LastMaintenanceTime)
}
}
})
}
}

View File

@@ -35,6 +35,12 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/util/kube"
appsv1 "k8s.io/api/apps/v1"
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
const (
@@ -86,29 +92,34 @@ func DeleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error {
return nil
}
// WaitForJobComplete wait for completion of the specified job and update the latest job object
func WaitForJobComplete(ctx context.Context, client client.Client, ns string, job string) (*batchv1.Job, error) {
updated := &batchv1.Job{}
// waitForJobComplete wait for completion of the specified job and update the latest job object
func waitForJobComplete(ctx context.Context, client client.Client, ns string, job string) (*batchv1.Job, error) {
var ret *batchv1.Job
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
updated := &batchv1.Job{}
err := client.Get(ctx, types.NamespacedName{Namespace: ns, Name: job}, updated)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
ret = updated
if updated.Status.Succeeded > 0 {
return true, nil
}
if updated.Status.Failed > 0 {
return true, fmt.Errorf("maintenance job %s/%s failed", job, job)
return true, nil
}
return false, nil
})
return updated, err
return ret, err
}
func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
func getMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
// Get the maintenance job related pod by label selector
podList := &v1.PodList{}
err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name}))
@@ -137,7 +148,7 @@ func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, e
return terminated.Message, nil
}
func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) {
func getLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, &client.ListOptions{
@@ -162,7 +173,7 @@ func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error)
return &jobList.Items[0], nil
}
// GetMaintenanceJobConfig is called to get the Maintenance Job Config for the
// getMaintenanceJobConfig is called to get the Maintenance Job Config for the
// BackupRepository specified by the repo parameter.
//
// Params:
@@ -173,7 +184,7 @@ func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error)
// veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository.
// repoMaintenanceJobConfig: the repository maintenance job ConfigMap name.
// repo: the BackupRepository needs to run the maintenance Job.
func GetMaintenanceJobConfig(
func getMaintenanceJobConfig(
ctx context.Context,
client client.Client,
logger logrus.FieldLogger,
@@ -255,9 +266,27 @@ func GetMaintenanceJobConfig(
return result, nil
}
// WaitIncompleteMaintenance checks all the incomplete maintenance jobs of the specified repo and wait for them to complete,
func WaitMaintenanceJobComplete(cli client.Client, ctx context.Context, jobName, ns string, logger logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
log := logger.WithField("job name", jobName)
maintenanceJob, err := waitForJobComplete(ctx, cli, ns, jobName)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to wait for maintenance job complete")
}
log.Info("Maintenance repo complete")
result, err := getMaintenanceResultFromJob(cli, maintenanceJob)
if err != nil {
log.WithError(err).Warn("Failed to get maintenance job result")
}
return composeMaintenanceStatusFromJob(maintenanceJob, result), nil
}
// WaitAllMaintenanceJobComplete checks all the incomplete maintenance jobs of the specified repo and wait for them to complete,
// and then return the maintenance jobs in the range of limit
func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) {
func WaitAllMaintenanceJobComplete(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) {
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, &client.ListOptions{
Namespace: repo.Namespace,
@@ -290,7 +319,7 @@ func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *vel
if job.Status.Succeeded == 0 && job.Status.Failed == 0 {
log.Infof("Waiting for maintenance job %s to complete", job.Name)
updated, err := WaitForJobComplete(ctx, cli, job.Namespace, job.Name)
updated, err := waitForJobComplete(ctx, cli, job.Namespace, job.Name)
if err != nil {
return nil, errors.Wrapf(err, "error waiting maintenance job[%s] complete", job.Name)
}
@@ -298,18 +327,182 @@ func WaitIncompleteMaintenance(ctx context.Context, cli client.Client, repo *vel
job = updated
}
message, err := GetMaintenanceResultFromJob(cli, job)
message, err := getMaintenanceResultFromJob(cli, job)
if err != nil {
return nil, errors.Wrapf(err, "error getting maintenance job[%s] result", job.Name)
}
history = append(history, ComposeMaintenanceStatusFromJob(job, message))
history = append(history, composeMaintenanceStatusFromJob(job, message))
}
return history, nil
}
func ComposeMaintenanceStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus {
func StartMaintenanceJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, repoMaintenanceJobConfig string,
podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag, logger logrus.FieldLogger) (string, error) {
bsl := &velerov1api.BackupStorageLocation{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: repo.Namespace, Name: repo.Spec.BackupStorageLocation}, bsl); err != nil {
return "", errors.WithStack(err)
}
log := logger.WithFields(logrus.Fields{
"BSL name": bsl.Name,
"repo type": repo.Spec.RepositoryType,
"repo name": repo.Name,
"repo UID": repo.UID,
})
jobConfig, err := getMaintenanceJobConfig(
ctx,
cli,
log,
repo.Namespace,
repoMaintenanceJobConfig,
repo,
)
if err != nil {
log.Warnf("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.",
repo.Namespace+"/"+repoMaintenanceJobConfig,
err.Error(),
)
}
log.Info("Starting maintenance repo")
maintenanceJob, err := buildMaintenanceJob(cli, ctx, repo, bsl.Name, jobConfig, podResources, logLevel, logFormat)
if err != nil {
return "", errors.Wrap(err, "error to build maintenance job")
}
log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name))
if err := cli.Create(context.TODO(), maintenanceJob); err != nil {
return "", errors.Wrap(err, "error to create maintenance job")
}
log.Info("Repo maintenance job started")
return maintenanceJob.Name, nil
}
func buildMaintenanceJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, bslName string, config *JobConfigs,
podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag) (*batchv1.Job, error) {
// Get the Velero server deployment
deployment := &appsv1.Deployment{}
err := cli.Get(ctx, types.NamespacedName{Name: "velero", Namespace: repo.Namespace}, deployment)
if err != nil {
return nil, err
}
// Get the environment variables from the Velero server deployment
envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment)
// Get the referenced storage from the Velero server deployment
envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment)
// Get the volume mounts from the Velero server deployment
volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment)
// Get the volumes from the Velero server deployment
volumes := veleroutil.GetVolumesFromVeleroServer(deployment)
// Get the service account from the Velero server deployment
serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment)
// Get image
image := veleroutil.GetVeleroServerImage(deployment)
// Set resource limits and requests
cpuRequest := podResources.CPURequest
memRequest := podResources.MemoryRequest
cpuLimit := podResources.CPULimit
memLimit := podResources.MemoryLimit
if config != nil && config.PodResources != nil {
cpuRequest = config.PodResources.CPURequest
memRequest = config.PodResources.MemoryRequest
cpuLimit = config.PodResources.CPULimit
memLimit = config.PodResources.MemoryLimit
}
resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job")
}
// Set arguments
args := []string{"repo-maintenance"}
args = append(args, fmt.Sprintf("--repo-name=%s", repo.Spec.VolumeNamespace))
args = append(args, fmt.Sprintf("--repo-type=%s", repo.Spec.RepositoryType))
args = append(args, fmt.Sprintf("--backup-storage-location=%s", bslName))
args = append(args, fmt.Sprintf("--log-level=%s", logLevel.String()))
args = append(args, fmt.Sprintf("--log-format=%s", logFormat.String()))
// build the maintenance job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: GenerateJobName(repo.Name),
Namespace: repo.Namespace,
Labels: map[string]string{
RepositoryNameLabel: repo.Name,
},
},
Spec: batchv1.JobSpec{
BackoffLimit: new(int32), // Never retry
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "velero-repo-maintenance-pod",
Labels: map[string]string{
RepositoryNameLabel: repo.Name,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: image,
Command: []string{
"/velero",
},
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Env: envVars,
EnvFrom: envFromSources,
VolumeMounts: volumeMounts,
Resources: resources,
},
},
RestartPolicy: v1.RestartPolicyNever,
Volumes: volumes,
ServiceAccountName: serviceAccount,
},
},
},
}
if config != nil && len(config.LoadAffinities) > 0 {
affinity := kube.ToSystemAffinity(config.LoadAffinities)
job.Spec.Template.Spec.Affinity = affinity
}
if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil {
job.Spec.Template.Spec.Tolerations = tolerations
}
if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil {
job.Spec.Template.Spec.NodeSelector = nodeSelector
}
if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 {
job.Spec.Template.Labels = labels
}
if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 {
job.Spec.Template.Annotations = annotations
}
return job, nil
}
func composeMaintenanceStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus {
result := velerov1api.BackupRepositoryMaintenanceSucceeded
if job.Status.Failed > 0 {
result = velerov1api.BackupRepositoryMaintenanceFailed

View File

@@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -35,8 +36,12 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
appsv1 "k8s.io/api/apps/v1"
)
func TestGenerateJobName1(t *testing.T) {
@@ -138,25 +143,45 @@ func TestWaitForJobComplete(t *testing.T) {
Status: batchv1.JobStatus{},
}
schemeFail := runtime.NewScheme()
scheme := runtime.NewScheme()
batchv1.AddToScheme(scheme)
// Define test cases
tests := []struct {
description string // Test case description
jobStatus batchv1.JobStatus // Job status to set for the test
expectError bool // Whether an error is expected
description string // Test case description
kubeClientObj []runtime.Object
runtimeScheme *runtime.Scheme
jobStatus batchv1.JobStatus // Job status to set for the test
expectError bool // Whether an error is expected
}{
{
description: "Job Succeeded",
description: "wait error",
runtimeScheme: schemeFail,
expectError: true,
},
{
description: "Job Succeeded",
runtimeScheme: scheme,
kubeClientObj: []runtime.Object{
job,
},
jobStatus: batchv1.JobStatus{
Succeeded: 1,
},
expectError: false,
},
{
description: "Job Failed",
description: "Job Failed",
runtimeScheme: scheme,
kubeClientObj: []runtime.Object{
job,
},
jobStatus: batchv1.JobStatus{
Failed: 1,
},
expectError: true,
expectError: false,
},
}
@@ -166,9 +191,12 @@ func TestWaitForJobComplete(t *testing.T) {
// Set the job status
job.Status = tc.jobStatus
// Create a fake Kubernetes client
cli := fake.NewClientBuilder().WithObjects(job).Build()
fakeClientBuilder := fake.NewClientBuilder()
fakeClientBuilder = fakeClientBuilder.WithScheme(tc.runtimeScheme)
fakeClient := fakeClientBuilder.WithRuntimeObjects(tc.kubeClientObj...).Build()
// Call the function
_, err := WaitForJobComplete(context.Background(), cli, job.Namespace, job.Name)
_, err := waitForJobComplete(context.Background(), fakeClient, job.Namespace, job.Name)
// Check if the error matches the expectation
if tc.expectError {
@@ -202,7 +230,7 @@ func TestGetMaintenanceResultFromJob(t *testing.T) {
cli := fake.NewClientBuilder().WithObjects(job, pod).Build()
// test an error should be returned
result, err := GetMaintenanceResultFromJob(cli, job)
result, err := getMaintenanceResultFromJob(cli, job)
assert.Error(t, err)
assert.Equal(t, "", result)
@@ -217,7 +245,7 @@ func TestGetMaintenanceResultFromJob(t *testing.T) {
// Test an error should be returned
cli = fake.NewClientBuilder().WithObjects(job, pod).Build()
result, err = GetMaintenanceResultFromJob(cli, job)
result, err = getMaintenanceResultFromJob(cli, job)
assert.Error(t, err)
assert.Equal(t, "", result)
@@ -236,7 +264,7 @@ func TestGetMaintenanceResultFromJob(t *testing.T) {
// This call should return the termination message with no error
cli = fake.NewClientBuilder().WithObjects(job, pod).Build()
result, err = GetMaintenanceResultFromJob(cli, job)
result, err = getMaintenanceResultFromJob(cli, job)
assert.NoError(t, err)
assert.Equal(t, "test message", result)
}
@@ -278,7 +306,7 @@ func TestGetLatestMaintenanceJob(t *testing.T) {
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
// Call the function
job, err := GetLatestMaintenanceJob(cli, "default")
job, err := getLatestMaintenanceJob(cli, "default")
assert.NoError(t, err)
// We expect the returned job to be the newer job
@@ -441,7 +469,7 @@ func TestGetMaintenanceJobConfig(t *testing.T) {
fakeClient = velerotest.NewFakeControllerRuntimeClient(t)
}
jobConfig, err := GetMaintenanceJobConfig(
jobConfig, err := getMaintenanceJobConfig(
ctx,
fakeClient,
logger,
@@ -460,7 +488,7 @@ func TestGetMaintenanceJobConfig(t *testing.T) {
}
}
func TestWaitIncompleteMaintenance(t *testing.T) {
func TestWaitAllMaintenanceJobComplete(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
veleroNamespace := "velero"
@@ -728,7 +756,7 @@ func TestWaitIncompleteMaintenance(t *testing.T) {
fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build()
history, err := WaitIncompleteMaintenance(test.ctx, fakeClient, repo, 3, velerotest.NewLogger())
history, err := WaitAllMaintenanceJobComplete(test.ctx, fakeClient, repo, 3, velerotest.NewLogger())
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
@@ -748,3 +776,205 @@ func TestWaitIncompleteMaintenance(t *testing.T) {
cancel()
}
func TestBuildMaintenanceJob(t *testing.T) {
testCases := []struct {
name string
m *JobConfigs
deploy *appsv1.Deployment
logLevel logrus.Level
logFormat *logging.FormatFlag
expectedJobName string
expectedError bool
expectedEnv []v1.EnvVar
expectedEnvFrom []v1.EnvFromSource
}{
{
name: "Valid maintenance job",
m: &JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
MemoryRequest: "128Mi",
CPULimit: "200m",
MemoryLimit: "256Mi",
},
},
deploy: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "velero",
Namespace: "velero",
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: "velero-image",
Env: []v1.EnvVar{
{
Name: "test-name",
Value: "test-value",
},
},
EnvFrom: []v1.EnvFromSource{
{
ConfigMapRef: &v1.ConfigMapEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-configmap",
},
},
},
{
SecretRef: &v1.SecretEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-secret",
},
},
},
},
},
},
},
},
},
},
logLevel: logrus.InfoLevel,
logFormat: logging.NewFormatFlag(),
expectedJobName: "test-123-maintain-job",
expectedError: false,
expectedEnv: []v1.EnvVar{
{
Name: "test-name",
Value: "test-value",
},
},
expectedEnvFrom: []v1.EnvFromSource{
{
ConfigMapRef: &v1.ConfigMapEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-configmap",
},
},
},
{
SecretRef: &v1.SecretEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-secret",
},
},
},
},
},
{
name: "Error getting Velero server deployment",
m: &JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
MemoryRequest: "128Mi",
CPULimit: "200m",
MemoryLimit: "256Mi",
},
},
logLevel: logrus.InfoLevel,
logFormat: logging.NewFormatFlag(),
expectedJobName: "",
expectedError: true,
},
}
param := provider.RepoParam{
BackupRepo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-123",
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: "test-123",
RepositoryType: "kopia",
},
},
BackupLocation: &velerov1api.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-location",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a fake clientset with resources
objs := []runtime.Object{param.BackupLocation, param.BackupRepo}
if tc.deploy != nil {
objs = append(objs, tc.deploy)
}
scheme := runtime.NewScheme()
_ = appsv1.AddToScheme(scheme)
_ = velerov1api.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build()
// Call the function to test
job, err := buildMaintenanceJob(cli, context.TODO(), param.BackupRepo, param.BackupLocation.Name, tc.m, *tc.m.PodResources, tc.logLevel, tc.logFormat)
// Check the error
if tc.expectedError {
assert.Error(t, err)
assert.Nil(t, job)
} else {
assert.NoError(t, err)
assert.NotNil(t, job)
assert.Contains(t, job.Name, tc.expectedJobName)
assert.Equal(t, param.BackupRepo.Namespace, job.Namespace)
assert.Equal(t, param.BackupRepo.Name, job.Labels[RepositoryNameLabel])
assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[RepositoryNameLabel])
// Check container
assert.Len(t, job.Spec.Template.Spec.Containers, 1)
container := job.Spec.Template.Spec.Containers[0]
assert.Equal(t, "velero-repo-maintenance-container", container.Name)
assert.Equal(t, "velero-image", container.Image)
assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy)
// Check container env
assert.Equal(t, tc.expectedEnv, container.Env)
assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom)
// Check resources
expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest),
v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit),
v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit),
},
}
assert.Equal(t, expectedResources, container.Resources)
// Check args
expectedArgs := []string{
"repo-maintenance",
fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace),
fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType),
fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name),
fmt.Sprintf("--log-level=%s", tc.logLevel.String()),
fmt.Sprintf("--log-format=%s", tc.logFormat.String()),
}
assert.Equal(t, expectedArgs, container.Args)
// Check affinity
assert.Nil(t, job.Spec.Template.Spec.Affinity)
// Check tolerations
assert.Nil(t, job.Spec.Template.Spec.Tolerations)
// Check node selector
assert.Nil(t, job.Spec.Template.Spec.NodeSelector)
}
})
}
}

View File

@@ -23,11 +23,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
@@ -35,9 +30,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
)
// Manager manages backup repositories.
@@ -54,7 +46,7 @@ type Manager interface {
PrepareRepo(repo *velerov1api.BackupRepository) error
// PruneRepo deletes unused data from a repo.
PruneRepo(repo *velerov1api.BackupRepository) (velerov1api.BackupRepositoryMaintenanceStatus, error)
PruneRepo(repo *velerov1api.BackupRepository) error
// UnlockRepo removes stale locks from a repo.
UnlockRepo(repo *velerov1api.BackupRepository) error
@@ -76,16 +68,10 @@ type manager struct {
providers map[string]provider.Provider
// client is the Velero controller manager's client.
// It's limited to resources in the Velero namespace.
client client.Client
repoLocker *repository.RepoLocker
repoEnsurer *repository.Ensurer
fileSystem filesystem.Interface
repoMaintenanceJobConfig string
podResources kube.PodResources
keepLatestMaintenanceJobs int
log logrus.FieldLogger
logLevel logrus.Level
logFormat *logging.FormatFlag
client client.Client
repoLocker *repository.RepoLocker
fileSystem filesystem.Interface
log logrus.FieldLogger
}
// NewManager create a new repository manager.
@@ -93,29 +79,17 @@ func NewManager(
namespace string,
client client.Client,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
credentialFileStore credentials.FileStore,
credentialSecretStore credentials.SecretStore,
repoMaintenanceJobConfig string,
podResources kube.PodResources,
keepLatestMaintenanceJobs int,
log logrus.FieldLogger,
logLevel logrus.Level,
logFormat *logging.FormatFlag,
) Manager {
mgr := &manager{
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
fileSystem: filesystem.NewFileSystem(),
repoMaintenanceJobConfig: repoMaintenanceJobConfig,
podResources: podResources,
keepLatestMaintenanceJobs: keepLatestMaintenanceJobs,
log: log,
logLevel: logLevel,
logFormat: logFormat,
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
fileSystem: filesystem.NewFileSystem(),
log: log,
}
mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log)
@@ -172,86 +146,24 @@ func (m *manager) PrepareRepo(repo *velerov1api.BackupRepository) error {
return prd.PrepareRepo(context.Background(), param)
}
func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) (velerov1api.BackupRepositoryMaintenanceStatus, error) {
func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error {
m.repoLocker.LockExclusive(repo.Name)
defer m.repoLocker.UnlockExclusive(repo.Name)
prd, err := m.getRepositoryProvider(repo)
if err != nil {
return errors.WithStack(err)
}
param, err := m.assembleRepoParam(repo)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.WithStack(err)
return errors.WithStack(err)
}
log := m.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"repo type": param.BackupRepo.Spec.RepositoryType,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
job, err := repository.GetLatestMaintenanceJob(m.client, m.namespace)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.WithStack(err)
if err := prd.BoostRepoConnect(context.Background(), param); err != nil {
return errors.WithStack(err)
}
if job != nil && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
log.Debugf("There already has a unfinished maintenance job %s/%s for repository %s, please wait for it to complete", job.Namespace, job.Name, param.BackupRepo.Name)
return velerov1api.BackupRepositoryMaintenanceStatus{}, nil
}
jobConfig, err := repository.GetMaintenanceJobConfig(
context.Background(),
m.client,
m.log,
m.namespace,
m.repoMaintenanceJobConfig,
repo,
)
if err != nil {
log.Infof("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.",
m.namespace+"/"+m.repoMaintenanceJobConfig,
err.Error(),
)
}
log.Info("Start to maintenance repo")
maintenanceJob, err := m.buildMaintenanceJob(
jobConfig,
param,
)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to build maintenance job")
}
log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name))
if err := m.client.Create(context.TODO(), maintenanceJob); err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to create maintenance job")
}
log.Debug("Creating maintenance job")
defer func() {
if err := repository.DeleteOldMaintenanceJobs(
m.client,
param.BackupRepo.Name,
m.keepLatestMaintenanceJobs,
); err != nil {
log.WithError(err).Error("Failed to delete maintenance job")
}
}()
maintenanceJob, err = repository.WaitForJobComplete(context.TODO(), m.client, maintenanceJob.Namespace, maintenanceJob.Name)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to wait for maintenance job complete")
}
result, err := repository.GetMaintenanceResultFromJob(m.client, maintenanceJob)
if err != nil {
return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to get maintenance job result")
}
log.Info("Maintenance repo complete")
return repository.ComposeMaintenanceStatusFromJob(maintenanceJob, result), nil
return prd.PruneRepo(context.Background(), param)
}
func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error {
@@ -344,122 +256,3 @@ func (m *manager) assembleRepoParam(repo *velerov1api.BackupRepository) (provide
BackupRepo: repo,
}, nil
}
func (m *manager) buildMaintenanceJob(
config *repository.JobConfigs,
param provider.RepoParam,
) (*batchv1.Job, error) {
// Get the Velero server deployment
deployment := &appsv1.Deployment{}
err := m.client.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: m.namespace}, deployment)
if err != nil {
return nil, err
}
// Get the environment variables from the Velero server deployment
envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment)
// Get the referenced storage from the Velero server deployment
envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment)
// Get the volume mounts from the Velero server deployment
volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment)
// Get the volumes from the Velero server deployment
volumes := veleroutil.GetVolumesFromVeleroServer(deployment)
// Get the service account from the Velero server deployment
serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment)
// Get image
image := veleroutil.GetVeleroServerImage(deployment)
// Set resource limits and requests
cpuRequest := m.podResources.CPURequest
memRequest := m.podResources.MemoryRequest
cpuLimit := m.podResources.CPULimit
memLimit := m.podResources.MemoryLimit
if config != nil && config.PodResources != nil {
cpuRequest = config.PodResources.CPURequest
memRequest = config.PodResources.MemoryRequest
cpuLimit = config.PodResources.CPULimit
memLimit = config.PodResources.MemoryLimit
}
resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job")
}
// Set arguments
args := []string{"repo-maintenance"}
args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace))
args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType))
args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name))
args = append(args, fmt.Sprintf("--log-level=%s", m.logLevel.String()))
args = append(args, fmt.Sprintf("--log-format=%s", m.logFormat.String()))
// build the maintenance job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: repository.GenerateJobName(param.BackupRepo.Name),
Namespace: param.BackupRepo.Namespace,
Labels: map[string]string{
repository.RepositoryNameLabel: param.BackupRepo.Name,
},
},
Spec: batchv1.JobSpec{
BackoffLimit: new(int32), // Never retry
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "velero-repo-maintenance-pod",
Labels: map[string]string{
repository.RepositoryNameLabel: param.BackupRepo.Name,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: image,
Command: []string{
"/velero",
},
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Env: envVars,
EnvFrom: envFromSources,
VolumeMounts: volumeMounts,
Resources: resources,
},
},
RestartPolicy: v1.RestartPolicyNever,
Volumes: volumes,
ServiceAccountName: serviceAccount,
},
},
},
}
if config != nil && len(config.LoadAffinities) > 0 {
affinity := kube.ToSystemAffinity(config.LoadAffinities)
job.Spec.Template.Spec.Affinity = affinity
}
if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil {
job.Spec.Template.Spec.Tolerations = tolerations
}
if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil {
job.Spec.Template.Spec.NodeSelector = nodeSelector
}
if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 {
job.Spec.Template.Labels = labels
}
if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 {
job.Spec.Template.Annotations = annotations
}
return job, nil
}

View File

@@ -17,31 +17,18 @@ limitations under the License.
package repository
import (
"fmt"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
func TestGetRepositoryProvider(t *testing.T) {
var fakeClient kbclient.Client
mgr := NewManager("", fakeClient, nil, nil, nil, nil, "", kube.PodResources{}, 3, nil, logrus.InfoLevel, nil).(*manager)
mgr := NewManager("", fakeClient, nil, nil, nil, nil).(*manager)
repo := &velerov1.BackupRepository{}
// empty repository type
@@ -60,220 +47,3 @@ func TestGetRepositoryProvider(t *testing.T) {
_, err = mgr.getRepositoryProvider(repo)
require.Error(t, err)
}
func TestBuildMaintenanceJob(t *testing.T) {
testCases := []struct {
name string
m *repository.JobConfigs
deploy *appsv1.Deployment
logLevel logrus.Level
logFormat *logging.FormatFlag
expectedJobName string
expectedError bool
expectedEnv []v1.EnvVar
expectedEnvFrom []v1.EnvFromSource
}{
{
name: "Valid maintenance job",
m: &repository.JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
MemoryRequest: "128Mi",
CPULimit: "200m",
MemoryLimit: "256Mi",
},
},
deploy: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "velero",
Namespace: "velero",
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: "velero-image",
Env: []v1.EnvVar{
{
Name: "test-name",
Value: "test-value",
},
},
EnvFrom: []v1.EnvFromSource{
{
ConfigMapRef: &v1.ConfigMapEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-configmap",
},
},
},
{
SecretRef: &v1.SecretEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-secret",
},
},
},
},
},
},
},
},
},
},
logLevel: logrus.InfoLevel,
logFormat: logging.NewFormatFlag(),
expectedJobName: "test-123-maintain-job",
expectedError: false,
expectedEnv: []v1.EnvVar{
{
Name: "test-name",
Value: "test-value",
},
},
expectedEnvFrom: []v1.EnvFromSource{
{
ConfigMapRef: &v1.ConfigMapEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-configmap",
},
},
},
{
SecretRef: &v1.SecretEnvSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "test-secret",
},
},
},
},
},
{
name: "Error getting Velero server deployment",
m: &repository.JobConfigs{
PodResources: &kube.PodResources{
CPURequest: "100m",
MemoryRequest: "128Mi",
CPULimit: "200m",
MemoryLimit: "256Mi",
},
},
logLevel: logrus.InfoLevel,
logFormat: logging.NewFormatFlag(),
expectedJobName: "",
expectedError: true,
},
}
param := provider.RepoParam{
BackupRepo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-123",
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: "test-123",
RepositoryType: "kopia",
},
},
BackupLocation: &velerov1api.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-location",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a fake clientset with resources
objs := []runtime.Object{param.BackupLocation, param.BackupRepo}
if tc.deploy != nil {
objs = append(objs, tc.deploy)
}
scheme := runtime.NewScheme()
_ = appsv1.AddToScheme(scheme)
_ = velerov1api.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build()
mgr := NewManager(
"velero",
cli,
nil,
nil,
nil,
nil,
"",
kube.PodResources{},
3,
nil,
logrus.InfoLevel,
logging.NewFormatFlag(),
).(*manager)
// Call the function to test
job, err := mgr.buildMaintenanceJob(tc.m, param)
// Check the error
if tc.expectedError {
assert.Error(t, err)
assert.Nil(t, job)
} else {
assert.NoError(t, err)
assert.NotNil(t, job)
assert.Contains(t, job.Name, tc.expectedJobName)
assert.Equal(t, param.BackupRepo.Namespace, job.Namespace)
assert.Equal(t, param.BackupRepo.Name, job.Labels[repository.RepositoryNameLabel])
assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[repository.RepositoryNameLabel])
// Check container
assert.Len(t, job.Spec.Template.Spec.Containers, 1)
container := job.Spec.Template.Spec.Containers[0]
assert.Equal(t, "velero-repo-maintenance-container", container.Name)
assert.Equal(t, "velero-image", container.Image)
assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy)
// Check container env
assert.Equal(t, tc.expectedEnv, container.Env)
assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom)
// Check resources
expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest),
v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit),
v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit),
},
}
assert.Equal(t, expectedResources, container.Resources)
// Check args
expectedArgs := []string{
"repo-maintenance",
fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace),
fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType),
fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name),
fmt.Sprintf("--log-level=%s", tc.logLevel.String()),
fmt.Sprintf("--log-format=%s", tc.logFormat.String()),
}
assert.Equal(t, expectedArgs, container.Args)
// Check affinity
assert.Nil(t, job.Spec.Template.Spec.Affinity)
// Check tolerations
assert.Nil(t, job.Spec.Template.Spec.Tolerations)
// Check node selector
assert.Nil(t, job.Spec.Template.Spec.NodeSelector)
}
})
}
}

View File

@@ -138,31 +138,21 @@ func (_m *Manager) PrepareRepo(repo *v1.BackupRepository) error {
}
// PruneRepo provides a mock function with given fields: repo
func (_m *Manager) PruneRepo(repo *v1.BackupRepository) (v1.BackupRepositoryMaintenanceStatus, error) {
func (_m *Manager) PruneRepo(repo *v1.BackupRepository) error {
ret := _m.Called(repo)
if len(ret) == 0 {
panic("no return value specified for PruneRepo")
}
var r0 v1.BackupRepositoryMaintenanceStatus
var r1 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) (v1.BackupRepositoryMaintenanceStatus, error)); ok {
return rf(repo)
}
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) v1.BackupRepositoryMaintenanceStatus); ok {
var r0 error
if rf, ok := ret.Get(0).(func(*v1.BackupRepository) error); ok {
r0 = rf(repo)
} else {
r0 = ret.Get(0).(v1.BackupRepositoryMaintenanceStatus)
r0 = ret.Error(0)
}
if rf, ok := ret.Get(1).(func(*v1.BackupRepository) error); ok {
r1 = rf(repo)
} else {
r1 = ret.Error(1)
}
return r0, r1
return r0
}
// UnlockRepo provides a mock function with given fields: repo