Merge pull request #7451 from qiuming-best/maintenance-job

Add repository maintenance job
This commit is contained in:
qiuming
2024-03-28 14:47:15 +08:00
committed by GitHub
18 changed files with 1721 additions and 29 deletions

View File

@@ -0,0 +1 @@
Add repository maintenance job

View File

@@ -23,6 +23,7 @@ import (
"strings"
"time"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/pkg/errors"
@@ -84,6 +85,7 @@ type Options struct {
DefaultSnapshotMoveData bool
DisableInformerCache bool
ScheduleSkipImmediately bool
MaintenanceCfg repository.MaintenanceConfig
}
// BindFlags adds command line values to the options struct.
@@ -128,6 +130,11 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.DefaultSnapshotMoveData, "default-snapshot-move-data", o.DefaultSnapshotMoveData, "Bool flag to configure Velero server to move data by default for all snapshots supporting data movement. Optional.")
flags.BoolVar(&o.DisableInformerCache, "disable-informer-cache", o.DisableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable). Optional.")
flags.BoolVar(&o.ScheduleSkipImmediately, "schedule-skip-immediately", o.ScheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).")
flags.IntVar(&o.MaintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", o.MaintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.")
flags.StringVar(&o.MaintenanceCfg.CPURequest, "maintenance-job-cpu-request", o.MaintenanceCfg.CPURequest, "CPU request for maintenance jobs. Default is no limit.")
flags.StringVar(&o.MaintenanceCfg.MemRequest, "maintenance-job-mem-request", o.MaintenanceCfg.MemRequest, "Memory request for maintenance jobs. Default is no limit.")
flags.StringVar(&o.MaintenanceCfg.CPULimit, "maintenance-job-cpu-limit", o.MaintenanceCfg.CPULimit, "CPU limit for maintenance jobs. Default is no limit.")
flags.StringVar(&o.MaintenanceCfg.MemLimit, "maintenance-job-mem-limit", o.MaintenanceCfg.MemLimit, "Memory limit for maintenance jobs. Default is no limit.")
}
// NewInstallOptions instantiates a new, default InstallOptions struct.
@@ -157,6 +164,9 @@ func NewInstallOptions() *Options {
DefaultSnapshotMoveData: false,
DisableInformerCache: false,
ScheduleSkipImmediately: false,
MaintenanceCfg: repository.MaintenanceConfig{
KeepLatestMaitenanceJobs: repository.DefaultKeepLatestMaitenanceJobs,
},
}
}
@@ -224,6 +234,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
DefaultSnapshotMoveData: o.DefaultSnapshotMoveData,
DisableInformerCache: o.DisableInformerCache,
ScheduleSkipImmediately: o.ScheduleSkipImmediately,
MaintenanceCfg: o.MaintenanceCfg,
}, nil
}

View File

@@ -0,0 +1,179 @@
package repomantenance
import (
"context"
"fmt"
"os"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerocli "github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
type Options struct {
RepoName string
BackupStorageLocation string
RepoType string
LogLevelFlag *logging.LevelFlag
FormatFlag *logging.FormatFlag
}
func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.RepoName, "repo-name", "", "namespace of the pod/volume that the snapshot is for")
flags.StringVar(&o.BackupStorageLocation, "backup-storage-location", "", "backup's storage location name")
flags.StringVar(&o.RepoType, "repo-type", velerov1api.BackupRepositoryTypeKopia, "type of the repository where the snapshot is stored")
flags.Var(o.LogLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(o.LogLevelFlag.AllowedValues(), ", ")))
flags.Var(o.FormatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(o.FormatFlag.AllowedValues(), ", ")))
}
func NewCommand(f velerocli.Factory) *cobra.Command {
o := &Options{
LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel),
FormatFlag: logging.NewFormatFlag(),
}
cmd := &cobra.Command{
Use: "repo-maintenance",
Hidden: true,
Short: "VELERO INTERNAL COMMAND ONLY - not intended to be run directly by users",
Run: func(c *cobra.Command, args []string) {
o.Run(f)
},
}
o.BindFlags(cmd.Flags())
return cmd
}
func (o *Options) Run(f velerocli.Factory) {
logger := logging.DefaultLogger(o.LogLevelFlag.Parse(), o.FormatFlag.Parse())
logger.SetOutput(os.Stdout)
pruneError := o.runRepoPrune(f, f.Namespace(), logger)
defer func() {
if pruneError != nil {
os.Exit(1)
}
}()
if pruneError != nil {
logger.WithError(pruneError).Error("An error occurred when running repo prune")
terminationLogFile, err := os.Create("/dev/termination-log")
if err != nil {
logger.WithError(err).Error("Failed to create termination log file")
return
}
defer terminationLogFile.Close()
if _, errWrite := terminationLogFile.WriteString(fmt.Sprintf("An error occurred: %v", err)); errWrite != nil {
logger.WithError(errWrite).Error("Failed to write error to termination log file")
}
}
}
func (o *Options) initClient(f velerocli.Factory) (client.Client, error) {
scheme := runtime.NewScheme()
err := velerov1api.AddToScheme(scheme)
if err != nil {
return nil, errors.Wrap(err, "failed to add velero scheme")
}
err = v1.AddToScheme(scheme)
if err != nil {
return nil, errors.Wrap(err, "failed to add api core scheme")
}
config, err := f.ClientConfig()
if err != nil {
return nil, errors.Wrap(err, "failed to get client config")
}
cli, err := client.New(config, client.Options{
Scheme: scheme,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create client")
}
return cli, nil
}
func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error {
cli, err := o.initClient(f)
if err != nil {
return err
}
credentialFileStore, err := credentials.NewNamespacedFileStore(
cli,
namespace,
"/tmp/credentials",
filesystem.NewFileSystem(),
)
if err != nil {
return errors.Wrap(err, "failed to create namespaced file store")
}
credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, namespace)
if err != nil {
return errors.Wrap(err, "failed to create namespaced secret store")
}
var repoProvider provider.Provider
if o.RepoType == velerov1api.BackupRepositoryTypeRestic {
repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger)
} else {
repoProvider = provider.NewUnifiedRepoProvider(
credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, o.RepoType, cli, logger)
}
// backupRepository
repo, err := repository.GetBackupRepository(context.Background(), cli, namespace,
repository.BackupRepositoryKey{
VolumeNamespace: o.RepoName,
BackupLocation: o.BackupStorageLocation,
RepositoryType: o.RepoType,
}, true)
if err != nil {
return errors.Wrap(err, "failed to get backup repository")
}
// bsl
bsl := &velerov1api.BackupStorageLocation{}
err = cli.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: repo.Spec.BackupStorageLocation}, bsl)
if err != nil {
return errors.Wrap(err, "failed to get backup storage location")
}
para := provider.RepoParam{
BackupRepo: repo,
BackupLocation: bsl,
}
err = repoProvider.BoostRepoConnect(context.Background(), para)
if err != nil {
return errors.Wrap(err, "failed to boost repo connect")
}
err = repoProvider.PruneRepo(context.Background(), para)
if err != nil {
return errors.Wrap(err, "failed to prune repo")
}
return nil
}

View File

@@ -32,6 +32,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
batchv1api "k8s.io/api/batch/v1"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -136,6 +138,7 @@ type serverConfig struct {
defaultSnapshotMoveData bool
disableInformerCache bool
scheduleSkipImmediately bool
maintenanceCfg repository.MaintenanceConfig
}
func NewCommand(f client.Factory) *cobra.Command {
@@ -167,6 +170,9 @@ func NewCommand(f client.Factory) *cobra.Command {
defaultSnapshotMoveData: false,
disableInformerCache: defaultDisableInformerCache,
scheduleSkipImmediately: false,
maintenanceCfg: repository.MaintenanceConfig{
KeepLatestMaitenanceJobs: repository.DefaultKeepLatestMaitenanceJobs,
},
}
)
@@ -240,7 +246,15 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.")
command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).")
command.Flags().BoolVar(&config.scheduleSkipImmediately, "schedule-skip-immediately", config.scheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).")
command.Flags().IntVar(&config.maintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", config.maintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.")
command.Flags().StringVar(&config.maintenanceCfg.CPURequest, "maintenance-job-cpu-request", config.maintenanceCfg.CPURequest, "CPU request for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.MemRequest, "maintenance-job-mem-request", config.maintenanceCfg.MemRequest, "Memory request for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.CPULimit, "maintenance-job-cpu-limit", config.maintenanceCfg.CPULimit, "CPU limit for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.MemLimit, "maintenance-job-mem-limit", config.maintenanceCfg.MemLimit, "Memory limit for maintenance job. Default is no limit.")
// maintenance job log setting inherited from velero server
config.maintenanceCfg.FormatFlag = config.formatFlag
config.maintenanceCfg.LogLevelFlag = logLevelFlag
return command
}
@@ -347,6 +361,14 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
cancelFunc()
return nil, err
}
if err := batchv1api.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, err
}
if err := appsv1.AddToScheme(scheme); err != nil {
cancelFunc()
return nil, err
}
ctrl.SetLogger(logrusr.New(logger))
@@ -647,7 +669,7 @@ func (s *server) initRepoManager() error {
s.repoLocker = repository.NewRepoLocker()
s.repoEnsurer = repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger)
s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.config.maintenanceCfg, s.logger)
return nil
}

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"
"github.com/vmware-tanzu/velero/pkg/cmd/cli/debug"
"github.com/vmware-tanzu/velero/pkg/cmd/cli/repomantenance"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd/cli/backup"
@@ -122,6 +123,7 @@ operations can also be performed as 'velero backup get' and 'velero schedule cre
backuplocation.NewCommand(f),
snapshotlocation.NewCommand(f),
debug.NewCommand(f),
repomantenance.NewCommand(f),
)
// init and add the klog flags

View File

@@ -189,10 +189,16 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
switch backupRepo.Status.Phase {
case velerov1api.BackupRepositoryPhaseNotReady:
ready, err := r.checkNotReadyRepo(ctx, backupRepo, log)
if err != nil {
return ctrl.Result{}, err
} else if !ready {
return ctrl.Result{}, nil
}
fallthrough
case velerov1api.BackupRepositoryPhaseReady:
return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log)
case velerov1api.BackupRepositoryPhaseNotReady:
return ctrl.Result{}, r.checkNotReadyRepo(ctx, backupRepo, log)
}
return ctrl.Result{}, nil
@@ -277,8 +283,6 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repository.Manag
}
func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
log.Debug("backupRepositoryController.runMaintenanceIfDue")
now := r.clock.Now()
if !dueForMaintenance(req, now) {
@@ -291,6 +295,7 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel
// prune failures should be displayed in the `.status.message` field but
// should not cause the repo to move to `NotReady`.
log.Debug("Pruning repo")
if err := r.repositoryManager.PruneRepo(req); err != nil {
log.WithError(err).Warn("error pruning repository")
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
@@ -299,6 +304,7 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel
}
return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
rr.Status.Message = ""
rr.Status.LastMaintenanceTime = &metav1.Time{Time: now}
})
}
@@ -307,28 +313,32 @@ func dueForMaintenance(req *velerov1api.BackupRepository, now time.Time) bool {
return req.Status.LastMaintenanceTime == nil || req.Status.LastMaintenanceTime.Add(req.Spec.MaintenanceFrequency.Duration).Before(now)
}
func (r *BackupRepoReconciler) checkNotReadyRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
func (r *BackupRepoReconciler) checkNotReadyRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) (bool, error) {
log.Info("Checking backup repository for readiness")
repoIdentifier, err := r.getIdentiferByBSL(ctx, req)
if err != nil {
return r.patchBackupRepository(ctx, req, repoNotReady(err.Error()))
return false, r.patchBackupRepository(ctx, req, repoNotReady(err.Error()))
}
if repoIdentifier != req.Spec.ResticIdentifier {
if err := r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) {
rr.Spec.ResticIdentifier = repoIdentifier
}); err != nil {
return err
return false, err
}
}
// we need to ensure it (first check, if check fails, attempt to init)
// because we don't know if it's been successfully initialized yet.
if err := ensureRepo(req, r.repositoryManager); err != nil {
return r.patchBackupRepository(ctx, req, repoNotReady(err.Error()))
return false, r.patchBackupRepository(ctx, req, repoNotReady(err.Error()))
}
return r.patchBackupRepository(ctx, req, repoReady())
err = r.patchBackupRepository(ctx, req, repoReady())
if err != nil {
return false, err
}
return true, nil
}
func repoNotReady(msg string) func(*velerov1api.BackupRepository) {

View File

@@ -93,7 +93,7 @@ func TestCheckNotReadyRepo(t *testing.T) {
err = reconciler.Client.Create(context.TODO(), locations)
assert.NoError(t, err)
err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger)
_, err = reconciler.checkNotReadyRepo(context.TODO(), rr, reconciler.logger)
assert.NoError(t, err)
assert.Equal(t, rr.Status.Phase, velerov1api.BackupRepositoryPhaseReady)
assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier)

View File

@@ -184,7 +184,7 @@ func (fs *fileSystemBR) Cancel() {
func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error {
if repositoryType == velerov1api.BackupRepositoryTypeKopia {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.client, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
return err
}
} else {

View File

@@ -27,6 +27,7 @@ import (
"github.com/vmware-tanzu/velero/internal/velero"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/repository"
)
type podTemplateOption func(*podTemplateConfig)
@@ -51,6 +52,7 @@ type podTemplateConfig struct {
privilegedNodeAgent bool
disableInformerCache bool
scheduleSkipImmediately bool
maintenanceConfig repository.MaintenanceConfig
}
func WithImage(image string) podTemplateOption {
@@ -177,6 +179,12 @@ func WithScheduleSkipImmediately(b bool) podTemplateOption {
}
}
func WithMaintenanceConfig(config repository.MaintenanceConfig) podTemplateOption {
return func(c *podTemplateConfig) {
c.maintenanceConfig = config
}
}
func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment {
// TODO: Add support for server args
c := &podTemplateConfig{
@@ -234,6 +242,26 @@ func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment
args = append(args, fmt.Sprintf("--fs-backup-timeout=%v", c.podVolumeOperationTimeout))
}
if c.maintenanceConfig.KeepLatestMaitenanceJobs > 0 {
args = append(args, fmt.Sprintf("--keep-latest-maintenance-jobs=%d", c.maintenanceConfig.KeepLatestMaitenanceJobs))
}
if c.maintenanceConfig.CPULimit != "" {
args = append(args, fmt.Sprintf("--maintenance-job-cpu-limit=%s", c.maintenanceConfig.CPULimit))
}
if c.maintenanceConfig.CPURequest != "" {
args = append(args, fmt.Sprintf("--maintenance-job-cpu-request=%s", c.maintenanceConfig.CPURequest))
}
if c.maintenanceConfig.MemLimit != "" {
args = append(args, fmt.Sprintf("--maintenance-job-mem-limit=%s", c.maintenanceConfig.MemLimit))
}
if c.maintenanceConfig.MemRequest != "" {
args = append(args, fmt.Sprintf("--maintenance-job-mem-request=%s", c.maintenanceConfig.MemRequest))
}
deployment := &appsv1.Deployment{
ObjectMeta: objectMeta(namespace, "velero"),
TypeMeta: metav1.TypeMeta{

View File

@@ -22,6 +22,8 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
)
func TestDeployment(t *testing.T) {
@@ -68,4 +70,18 @@ func TestDeployment(t *testing.T) {
deploy = Deployment("velero", WithDisableInformerCache())
assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 2)
assert.Equal(t, "--disable-informer-cache=true", deploy.Spec.Template.Spec.Containers[0].Args[1])
deploy = Deployment("velero", WithMaintenanceConfig(repository.MaintenanceConfig{
KeepLatestMaitenanceJobs: 3,
CPURequest: "100m",
MemRequest: "256Mi",
CPULimit: "200m",
MemLimit: "512Mi",
}))
assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 6)
assert.Equal(t, "--keep-latest-maintenance-jobs=3", deploy.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--maintenance-job-cpu-limit=200m", deploy.Spec.Template.Spec.Containers[0].Args[2])
assert.Equal(t, "--maintenance-job-cpu-request=100m", deploy.Spec.Template.Spec.Containers[0].Args[3])
assert.Equal(t, "--maintenance-job-mem-limit=512Mi", deploy.Spec.Template.Spec.Containers[0].Args[4])
assert.Equal(t, "--maintenance-job-mem-request=256Mi", deploy.Spec.Template.Spec.Containers[0].Args[5])
}

View File

@@ -30,6 +30,8 @@ import (
v1crds "github.com/vmware-tanzu/velero/config/crd/v1/crds"
v2alpha1crds "github.com/vmware-tanzu/velero/config/crd/v2alpha1/crds"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
const (
@@ -261,6 +263,9 @@ type VeleroOptions struct {
DefaultSnapshotMoveData bool
DisableInformerCache bool
ScheduleSkipImmediately bool
FormatFlag *logging.FormatFlag
LogLevelFlag *logging.LevelFlag
MaintenanceCfg repository.MaintenanceConfig
}
func AllCRDs() *unstructured.UnstructuredList {
@@ -345,6 +350,7 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList {
WithPodVolumeOperationTimeout(o.PodVolumeOperationTimeout),
WithUploaderType(o.UploaderType),
WithScheduleSkipImmediately(o.ScheduleSkipImmediately),
WithMaintenanceConfig(o.MaintenanceCfg),
}
if len(o.Features) > 0 {

View File

@@ -0,0 +1,258 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repository
import (
"context"
"fmt"
"sort"
"time"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/pkg/errors"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/logging"
veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero"
)
const RepositoryNameLabel = "velero.io/repo-name"
const DefaultKeepLatestMaitenanceJobs = 3
const DefaultMaintenanceJobCPURequest = "0"
const DefaultMaintenanceJobCPULimit = "0"
const DefaultMaintenanceJobMemRequest = "0"
const DefaultMaintenanceJobMemLimit = "0"
// MaintenanceConfig is the configuration for the repo maintenance job
type MaintenanceConfig struct {
KeepLatestMaitenanceJobs int
CPURequest string
MemRequest string
CPULimit string
MemLimit string
LogLevelFlag *logging.LevelFlag
FormatFlag *logging.FormatFlag
}
func generateJobName(repo string) string {
millisecond := time.Now().UTC().UnixMilli() // millisecond
jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond)
if len(jobName) > 63 { // k8s job name length limit
jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond)
}
return jobName
}
func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli client.Client, namespace string) (*batchv1.Job, error) {
// Get the Velero server deployment
deployment := &appsv1.Deployment{}
err := cli.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: namespace}, deployment)
if err != nil {
return nil, err
}
// Get the environment variables from the Velero server deployment
envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment)
// Get the volume mounts from the Velero server deployment
volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment)
// Get the volumes from the Velero server deployment
volumes := veleroutil.GetVolumesFromVeleroServer(deployment)
// Get the service account from the Velero server deployment
serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment)
// Get image
image := veleroutil.GetVeleroServerImage(deployment)
// Set resource limits and requests
if m.CPURequest == "" {
m.CPURequest = DefaultMaintenanceJobCPURequest
}
if m.MemRequest == "" {
m.MemRequest = DefaultMaintenanceJobMemRequest
}
if m.CPULimit == "" {
m.CPULimit = DefaultMaintenanceJobCPULimit
}
if m.MemLimit == "" {
m.MemLimit = DefaultMaintenanceJobMemLimit
}
resources, err := kube.ParseResourceRequirements(m.CPURequest, m.MemRequest, m.CPULimit, m.MemLimit)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job")
}
// Set arguments
args := []string{"repo-maintenance"}
args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace))
args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType))
args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name))
args = append(args, fmt.Sprintf("--log-level=%s", m.LogLevelFlag.String()))
args = append(args, fmt.Sprintf("--log-format=%s", m.FormatFlag.String()))
// build the maintenance job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: generateJobName(param.BackupRepo.Name),
Namespace: param.BackupRepo.Namespace,
Labels: map[string]string{
RepositoryNameLabel: param.BackupRepo.Name,
},
},
Spec: batchv1.JobSpec{
BackoffLimit: new(int32), // Never retry
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "velero-repo-maintenance-pod",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: image,
Command: []string{
"/velero",
},
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Env: envVars,
VolumeMounts: volumeMounts,
Resources: resources,
},
},
RestartPolicy: v1.RestartPolicyNever,
Volumes: volumes,
ServiceAccountName: serviceAccount,
},
},
},
}
if affinity := veleroutil.GetAffinityFromVeleroServer(deployment); affinity != nil {
job.Spec.Template.Spec.Affinity = affinity
}
if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil {
job.Spec.Template.Spec.Tolerations = tolerations
}
if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil {
job.Spec.Template.Spec.NodeSelector = nodeSelector
}
if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 {
job.Spec.Template.Labels = labels
}
if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 {
job.Spec.Template.Annotations = annotations
}
return job, nil
}
// deleteOldMaintenanceJobs deletes old maintenance jobs and keeps the latest N jobs
func deleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
if err != nil {
return err
}
// Delete old maintenance jobs
if len(jobList.Items) > keep {
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp)
})
for i := 0; i < len(jobList.Items)-keep; i++ {
err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return err
}
}
}
return nil
}
func waitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error {
return wait.PollUntilContextCancel(ctx, 1, true, func(ctx context.Context) (bool, error) {
err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
if job.Status.Succeeded > 0 {
return true, nil
}
if job.Status.Failed > 0 {
return true, fmt.Errorf("maintenance job %s/%s failed", job.Namespace, job.Name)
}
return false, nil
})
}
func getMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) {
// Get the maintenance job related pod by label selector
podList := &v1.PodList{}
err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name}))
if err != nil {
return "", err
}
if len(podList.Items) == 0 {
return "", fmt.Errorf("no pod found for job %s", job.Name)
}
// we only have one maintenance pod for the job
return podList.Items[0].Status.ContainerStatuses[0].State.Terminated.Message, nil
}
func GetLatestMaintenanceJob(cli client.Client, repo string) (*batchv1.Job, error) {
// Get the maintenance job list by label
jobList := &batchv1.JobList{}
err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
if err != nil {
return nil, err
}
if len(jobList.Items) == 0 {
return nil, nil
}
// Get the latest maintenance job
sort.Slice(jobList.Items, func(i, j int) bool {
return jobList.Items[i].CreationTimestamp.Time.After(jobList.Items[j].CreationTimestamp.Time)
})
return &jobList.Items[0], nil
}

View File

@@ -0,0 +1,408 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package repository
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/repository/provider"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
func TestGenerateJobName1(t *testing.T) {
testCases := []struct {
repo string
expectedStart string
}{
{
repo: "example",
expectedStart: "example-maintain-job-",
},
{
repo: strings.Repeat("a", 60),
expectedStart: "repo-maintain-job-",
},
}
for _, tc := range testCases {
t.Run(tc.repo, func(t *testing.T) {
// Call the function to test
jobName := generateJobName(tc.repo)
// Check if the generated job name starts with the expected prefix
if !strings.HasPrefix(jobName, tc.expectedStart) {
t.Errorf("generated job name does not start with expected prefix")
}
// Check if the length of the generated job name exceeds the Kubernetes limit
if len(jobName) > 63 {
t.Errorf("generated job name exceeds Kubernetes limit")
}
})
}
}
func TestDeleteOldMaintenanceJobs(t *testing.T) {
// Set up test repo and keep value
repo := "test-repo"
keep := 2
// Create some maintenance jobs for testing
var objs []client.Object
// Create a newer job
newerJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, newerJob)
// Create older jobs
for i := 2; i <= 3; i++ {
olderJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("job%d", i),
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
CreationTimestamp: metav1.Time{
Time: metav1.Now().Add(time.Duration(-24*i) * time.Hour),
},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, olderJob)
}
// Create a fake Kubernetes client
scheme := runtime.NewScheme()
_ = batchv1.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
// Call the function
err := deleteOldMaintenanceJobs(cli, repo, keep)
assert.NoError(t, err)
// Get the remaining jobs
jobList := &batchv1.JobList{}
err = cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo}))
assert.NoError(t, err)
// We expect the number of jobs to be equal to 'keep'
assert.Equal(t, keep, len(jobList.Items))
// We expect that the oldest jobs were deleted
// Job3 should not be present in the remaining list
assert.NotContains(t, jobList.Items, objs[2])
// Job2 should also not be present in the remaining list
assert.NotContains(t, jobList.Items, objs[1])
}
func TestWaitForJobComplete(t *testing.T) {
// Set up test job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Status: batchv1.JobStatus{},
}
// Define test cases
tests := []struct {
description string // Test case description
jobStatus batchv1.JobStatus // Job status to set for the test
expectError bool // Whether an error is expected
}{
{
description: "Job Succeeded",
jobStatus: batchv1.JobStatus{
Succeeded: 1,
},
expectError: false,
},
{
description: "Job Failed",
jobStatus: batchv1.JobStatus{
Failed: 1,
},
expectError: true,
},
}
// Run tests
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
// Set the job status
job.Status = tc.jobStatus
// Create a fake Kubernetes client
cli := fake.NewClientBuilder().WithObjects(job).Build()
// Call the function
err := waitForJobComplete(context.Background(), cli, job)
// Check if the error matches the expectation
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
func TestGetMaintenanceResultFromJob(t *testing.T) {
// Set up test job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
}
// Set up test pod
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
Labels: map[string]string{"job-name": job.Name},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Message: "test message",
},
},
},
},
},
}
// Create a fake Kubernetes client
cli := fake.NewClientBuilder().WithObjects(job, pod).Build()
// Call the function
result, err := getMaintenanceResultFromJob(cli, job)
// Check if the result and error match the expectation
assert.NoError(t, err)
assert.Equal(t, "test message", result)
}
func TestGetLatestMaintenanceJob(t *testing.T) {
// Set up test repo
repo := "test-repo"
// Create some maintenance jobs for testing
var objs []client.Object
// Create a newer job
newerJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
CreationTimestamp: metav1.Time{
Time: metav1.Now().Add(time.Duration(-24) * time.Hour),
},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, newerJob)
// Create an older job
olderJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job2",
Namespace: "default",
Labels: map[string]string{RepositoryNameLabel: repo},
},
Spec: batchv1.JobSpec{},
}
objs = append(objs, olderJob)
// Create a fake Kubernetes client
scheme := runtime.NewScheme()
_ = batchv1.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
// Call the function
job, err := GetLatestMaintenanceJob(cli, repo)
assert.NoError(t, err)
// We expect the returned job to be the newer job
assert.Equal(t, newerJob.Name, job.Name)
}
func TestBuildMaintenanceJob(t *testing.T) {
testCases := []struct {
name string
m MaintenanceConfig
deploy *appsv1.Deployment
expectedJobName string
expectedError bool
}{
{
name: "Valid maintenance job",
m: MaintenanceConfig{
CPURequest: "100m",
MemRequest: "128Mi",
CPULimit: "200m",
MemLimit: "256Mi",
LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel),
FormatFlag: logging.NewFormatFlag(),
},
deploy: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "velero",
Namespace: "velero",
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "velero-repo-maintenance-container",
Image: "velero-image",
},
},
},
},
},
},
expectedJobName: "test-123-maintain-job",
expectedError: false,
},
{
name: "Error getting Velero server deployment",
m: MaintenanceConfig{
CPURequest: "100m",
MemRequest: "128Mi",
CPULimit: "200m",
MemLimit: "256Mi",
LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel),
FormatFlag: logging.NewFormatFlag(),
},
expectedJobName: "",
expectedError: true,
},
}
param := provider.RepoParam{
BackupRepo: &velerov1api.BackupRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-123",
},
Spec: velerov1api.BackupRepositorySpec{
VolumeNamespace: "test-123",
RepositoryType: "kopia",
},
},
BackupLocation: &velerov1api.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "test-location",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a fake clientset with resources
objs := []runtime.Object{param.BackupLocation, param.BackupRepo}
if tc.deploy != nil {
objs = append(objs, tc.deploy)
}
scheme := runtime.NewScheme()
_ = appsv1.AddToScheme(scheme)
_ = velerov1api.AddToScheme(scheme)
cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build()
// Call the function to test
job, err := buildMaintenanceJob(tc.m, param, cli, "velero")
// Check the error
if tc.expectedError {
assert.Error(t, err)
assert.Nil(t, job)
} else {
assert.NoError(t, err)
assert.NotNil(t, job)
assert.Contains(t, job.Name, tc.expectedJobName)
assert.Equal(t, param.BackupRepo.Namespace, job.Namespace)
assert.Equal(t, param.BackupRepo.Name, job.Labels[RepositoryNameLabel])
// Check container
assert.Len(t, job.Spec.Template.Spec.Containers, 1)
container := job.Spec.Template.Spec.Containers[0]
assert.Equal(t, "velero-repo-maintenance-container", container.Name)
assert.Equal(t, "velero-image", container.Image)
assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy)
// Check resources
expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.CPURequest),
v1.ResourceMemory: resource.MustParse(tc.m.MemRequest),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(tc.m.CPULimit),
v1.ResourceMemory: resource.MustParse(tc.m.MemLimit),
},
}
assert.Equal(t, expectedResources, container.Resources)
// Check args
expectedArgs := []string{
"repo-maintenance",
fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace),
fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType),
fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name),
fmt.Sprintf("--log-level=%s", tc.m.LogLevelFlag.String()),
fmt.Sprintf("--log-format=%s", tc.m.FormatFlag.String()),
}
assert.Equal(t, expectedArgs, container.Args)
// Check affinity
assert.Nil(t, job.Spec.Template.Spec.Affinity)
// Check tolerations
assert.Nil(t, job.Spec.Template.Spec.Tolerations)
// Check node selector
assert.Nil(t, job.Spec.Template.Spec.NodeSelector)
}
})
}
}

View File

@@ -92,13 +92,14 @@ type Manager interface {
}
type manager struct {
namespace string
providers map[string]provider.Provider
client client.Client
repoLocker *RepoLocker
repoEnsurer *Ensurer
fileSystem filesystem.Interface
log logrus.FieldLogger
namespace string
providers map[string]provider.Provider
client client.Client
repoLocker *RepoLocker
repoEnsurer *Ensurer
fileSystem filesystem.Interface
maintenanceCfg MaintenanceConfig
log logrus.FieldLogger
}
// NewManager create a new repository manager.
@@ -109,23 +110,25 @@ func NewManager(
repoEnsurer *Ensurer,
credentialFileStore credentials.FileStore,
credentialSecretStore credentials.SecretStore,
maintenanceCfg MaintenanceConfig,
log logrus.FieldLogger,
) Manager {
mgr := &manager{
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
fileSystem: filesystem.NewFileSystem(),
log: log,
namespace: namespace,
client: client,
providers: map[string]provider.Provider{},
repoLocker: repoLocker,
repoEnsurer: repoEnsurer,
fileSystem: filesystem.NewFileSystem(),
maintenanceCfg: maintenanceCfg,
log: log,
}
mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log)
mgr.providers[velerov1api.BackupRepositoryTypeKopia] = provider.NewUnifiedRepoProvider(credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, velerov1api.BackupRepositoryTypeKopia, mgr.log)
}, velerov1api.BackupRepositoryTypeKopia, client, mgr.log)
return mgr
}
@@ -192,7 +195,55 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error {
return errors.WithStack(err)
}
return prd.PruneRepo(context.Background(), param)
log := m.log.WithFields(logrus.Fields{
"BSL name": param.BackupLocation.Name,
"repo type": param.BackupRepo.Spec.RepositoryType,
"repo name": param.BackupRepo.Name,
"repo UID": param.BackupRepo.UID,
})
log.Info("Start to maintence repo")
maintenanceJob, err := buildMaintenanceJob(m.maintenanceCfg, param, m.client, m.namespace)
if err != nil {
return errors.Wrap(err, "error to build maintenance job")
}
log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name))
if err := m.client.Create(context.TODO(), maintenanceJob); err != nil {
return errors.Wrap(err, "error to create maintenance job")
}
log.Debug("Creating maintenance job")
defer func() {
if err := deleteOldMaintenanceJobs(m.client, param.BackupRepo.Name,
m.maintenanceCfg.KeepLatestMaitenanceJobs); err != nil {
log.WithError(err).Error("Failed to delete maintenance job")
}
}()
var jobErr error
if err := waitForJobComplete(context.TODO(), m.client, maintenanceJob); err != nil {
log.WithError(err).Error("Error to wait for maintenance job complete")
jobErr = err // we won't return here for job may failed by maintenance failure, we want return the actual error
}
result, err := getMaintenanceResultFromJob(m.client, maintenanceJob)
if err != nil {
return errors.Wrap(err, "error to get maintenance job result")
}
if result != "" {
return errors.New(fmt.Sprintf("Maintenance job %s failed: %s", maintenanceJob.Name, result))
}
if jobErr != nil {
return errors.Wrap(jobErr, "error to wait for maintenance job complete")
}
log.Info("Maintenance repo complete")
return nil
}
func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error {

View File

@@ -21,12 +21,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
)
func TestGetRepositoryProvider(t *testing.T) {
mgr := NewManager("", nil, nil, nil, nil, nil, nil).(*manager)
var fakeClient kbclient.Client
mgr := NewManager("", fakeClient, nil, nil, nil, nil, MaintenanceConfig{}, nil).(*manager)
repo := &velerov1.BackupRepository{}
// empty repository type

View File

@@ -29,6 +29,7 @@ import (
"github.com/kopia/kopia/repo"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
@@ -43,6 +44,7 @@ type unifiedRepoProvider struct {
workPath string
repoService udmrepo.BackupRepoService
repoBackend string
cli client.Client
log logrus.FieldLogger
}
@@ -73,11 +75,13 @@ const (
func NewUnifiedRepoProvider(
credentialGetter credentials.CredentialGetter,
repoBackend string,
cli client.Client,
log logrus.FieldLogger,
) Provider {
repo := unifiedRepoProvider{
credentialGetter: credentialGetter,
repoBackend: repoBackend,
cli: cli,
log: log,
}

80
pkg/util/velero/velero.go Normal file
View File

@@ -0,0 +1,80 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package velero
import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
)
// GetNodeSelectorFromVeleroServer get the node selector from the Velero server deployment
func GetNodeSelectorFromVeleroServer(deployment *appsv1.Deployment) map[string]string {
return deployment.Spec.Template.Spec.NodeSelector
}
// GetTolerationsFromVeleroServer get the tolerations from the Velero server deployment
func GetTolerationsFromVeleroServer(deployment *appsv1.Deployment) []v1.Toleration {
return deployment.Spec.Template.Spec.Tolerations
}
// GetAffinityFromVeleroServer get the affinity from the Velero server deployment
func GetAffinityFromVeleroServer(deployment *appsv1.Deployment) *v1.Affinity {
return deployment.Spec.Template.Spec.Affinity
}
// GetEnvVarsFromVeleroServer get the environment variables from the Velero server deployment
func GetEnvVarsFromVeleroServer(deployment *appsv1.Deployment) []v1.EnvVar {
for _, container := range deployment.Spec.Template.Spec.Containers {
// We only have one container in the Velero server deployment
return container.Env
}
return nil
}
// GetVolumeMountsFromVeleroServer get the volume mounts from the Velero server deployment
func GetVolumeMountsFromVeleroServer(deployment *appsv1.Deployment) []v1.VolumeMount {
for _, container := range deployment.Spec.Template.Spec.Containers {
// We only have one container in the Velero server deployment
return container.VolumeMounts
}
return nil
}
// GetVolumesFromVeleroServer get the volumes from the Velero server deployment
func GetVolumesFromVeleroServer(deployment *appsv1.Deployment) []v1.Volume {
return deployment.Spec.Template.Spec.Volumes
}
// GetServiceAccountFromVeleroServer get the service account from the Velero server deployment
func GetServiceAccountFromVeleroServer(deployment *appsv1.Deployment) string {
return deployment.Spec.Template.Spec.ServiceAccountName
}
// getVeleroServerImage get the image of the Velero server deployment
func GetVeleroServerImage(deployment *appsv1.Deployment) string {
return deployment.Spec.Template.Spec.Containers[0].Image
}
// GetVeleroServerLables get the labels of the Velero server deployment
func GetVeleroServerLables(deployment *appsv1.Deployment) map[string]string {
return deployment.Spec.Template.Labels
}
// GetVeleroServerAnnotations get the annotations of the Velero server deployment
func GetVeleroServerAnnotations(deployment *appsv1.Deployment) map[string]string {
return deployment.Spec.Template.Annotations
}

View File

@@ -0,0 +1,614 @@
/*
Copyright the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package velero
import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestGetNodeSelectorFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want map[string]string
}{
{
name: "no node selector",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
NodeSelector: map[string]string{},
},
},
},
},
want: map[string]string{},
},
{
name: "node selector",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
NodeSelector: map[string]string{
"foo": "bar",
},
},
},
},
},
want: map[string]string{
"foo": "bar",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetNodeSelectorFromVeleroServer(test.deploy)
if len(got) != len(test.want) {
t.Errorf("expected node selector to have %d elements, got %d", len(test.want), len(got))
}
for k, v := range test.want {
if got[k] != v {
t.Errorf("expected node selector to have key %s with value %s, got %s", k, v, got[k])
}
}
})
}
}
func TestGetTolerationsFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want []v1.Toleration
}{
{
name: "no tolerations",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Tolerations: []v1.Toleration{},
},
},
},
},
want: []v1.Toleration{},
},
{
name: "tolerations",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Tolerations: []v1.Toleration{
{
Key: "foo",
Operator: "Exists",
},
},
},
},
},
},
want: []v1.Toleration{
{
Key: "foo",
Operator: "Exists",
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetTolerationsFromVeleroServer(test.deploy)
if len(got) != len(test.want) {
t.Errorf("expected tolerations to have %d elements, got %d", len(test.want), len(got))
}
for i, want := range test.want {
if got[i] != want {
t.Errorf("expected toleration at index %d to be %v, got %v", i, want, got[i])
}
}
})
}
}
func TestGetAffinityFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want *v1.Affinity
}{
{
name: "no affinity",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Affinity: nil,
},
},
},
},
want: nil,
},
{
name: "affinity",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "foo",
Operator: "In",
Values: []string{"bar"},
},
},
},
},
},
},
},
},
},
},
},
want: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "foo",
Operator: "In",
Values: []string{"bar"},
},
},
},
},
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetAffinityFromVeleroServer(test.deploy)
if got == nil {
if test.want != nil {
t.Errorf("expected affinity to be %v, got nil", test.want)
}
} else {
if test.want == nil {
t.Errorf("expected affinity to be nil, got %v", got)
} else {
if got.NodeAffinity == nil {
if test.want.NodeAffinity != nil {
t.Errorf("expected node affinity to be %v, got nil", test.want.NodeAffinity)
}
} else {
if test.want.NodeAffinity == nil {
t.Errorf("expected node affinity to be nil, got %v", got.NodeAffinity)
} else {
if got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
if test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
t.Errorf("expected required during scheduling ignored during execution to be %v, got nil", test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
}
} else {
if test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
t.Errorf("expected required during scheduling ignored during execution to be nil, got %v", got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
} else {
if !reflect.DeepEqual(got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution, test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) {
t.Errorf("expected required during scheduling ignored during execution to be %v, got %v", test.want.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution, got.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
}
}
}
}
}
}
}
})
}
}
func TestGetEnvVarsFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want []v1.EnvVar
}{
{
name: "no env vars",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Env: []v1.EnvVar{},
},
},
},
},
},
},
want: []v1.EnvVar{},
},
{
name: "env vars",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Env: []v1.EnvVar{
{
Name: "foo",
Value: "bar",
},
},
},
},
},
},
},
},
want: []v1.EnvVar{
{
Name: "foo",
Value: "bar",
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetEnvVarsFromVeleroServer(test.deploy)
if len(got) != len(test.want) {
t.Errorf("expected env vars to have %d elements, got %d", len(test.want), len(got))
}
for i, want := range test.want {
if got[i] != want {
t.Errorf("expected env var at index %d to be %v, got %v", i, want, got[i])
}
}
})
}
}
func TestGetVolumeMountsFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want []v1.VolumeMount
}{
{
name: "no volume mounts",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
VolumeMounts: []v1.VolumeMount{},
},
},
},
},
},
},
want: []v1.VolumeMount{},
},
{
name: "volume mounts",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
VolumeMounts: []v1.VolumeMount{
{
Name: "foo",
MountPath: "/bar",
},
},
},
},
},
},
},
},
want: []v1.VolumeMount{
{
Name: "foo",
MountPath: "/bar",
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetVolumeMountsFromVeleroServer(test.deploy)
if len(got) != len(test.want) {
t.Errorf("expected volume mounts to have %d elements, got %d", len(test.want), len(got))
}
for i, want := range test.want {
if got[i] != want {
t.Errorf("expected volume mount at index %d to be %v, got %v", i, want, got[i])
}
}
})
}
}
func TestGetVolumesFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want []v1.Volume
}{
{
name: "no volumes",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Volumes: []v1.Volume{},
},
},
},
},
want: []v1.Volume{},
},
{
name: "volumes",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "foo",
},
},
},
},
},
},
want: []v1.Volume{
{
Name: "foo",
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetVolumesFromVeleroServer(test.deploy)
if len(got) != len(test.want) {
t.Errorf("expected volumes to have %d elements, got %d", len(test.want), len(got))
}
for i, want := range test.want {
if got[i] != want {
t.Errorf("expected volume at index %d to be %v, got %v", i, want, got[i])
}
}
})
}
}
func TestGetServiceAccountFromVeleroServer(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want string
}{
{
name: "no service account",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
ServiceAccountName: "",
},
},
},
},
want: "",
},
{
name: "service account",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
ServiceAccountName: "foo",
},
},
},
},
want: "foo",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetServiceAccountFromVeleroServer(test.deploy)
if got != test.want {
t.Errorf("expected service account to be %s, got %s", test.want, got)
}
})
}
}
func TestGetVeleroServerImage(t *testing.T) {
tests := []struct {
name string
deploy *appsv1.Deployment
want string
}{
{
name: "velero server image",
deploy: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Image: "velero/velero:latest",
},
},
},
},
},
},
want: "velero/velero:latest",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := GetVeleroServerImage(test.deploy)
if got != test.want {
t.Errorf("expected velero server image to be %s, got %s", test.want, got)
}
})
}
}
func TestGetVeleroServerLables(t *testing.T) {
tests := []struct {
name string
deployment *appsv1.Deployment
expected map[string]string
}{
{
name: "Empty Labels",
deployment: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
},
},
},
expected: map[string]string{},
},
{
name: "Non-empty Labels",
deployment: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "velero",
"component": "server",
},
},
},
},
},
expected: map[string]string{
"app": "velero",
"component": "server",
},
},
}
// Run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetVeleroServerLables(tt.deployment)
assert.Equal(t, tt.expected, result)
})
}
}
func TestGetVeleroServerAnnotations(t *testing.T) {
tests := []struct {
name string
deployment *appsv1.Deployment
expected map[string]string
}{
{
name: "Empty Labels",
deployment: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
},
},
},
expected: map[string]string{},
},
{
name: "Non-empty Labels",
deployment: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"app": "velero",
"component": "server",
},
},
},
},
},
expected: map[string]string{
"app": "velero",
"component": "server",
},
},
}
// Run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetVeleroServerAnnotations(tt.deployment)
assert.Equal(t, tt.expected, result)
})
}
}