mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-06 21:36:30 +00:00
Merge pull request #6173 from blackpiglet/linter_revive
Enable linter revive and resolve found errors: part1
This commit is contained in:
1
changelogs/unreleased/6173-blackpiglet
Normal file
1
changelogs/unreleased/6173-blackpiglet
Normal file
@@ -0,0 +1 @@
|
||||
Enable linter revive and resolve found errors: part 1
|
||||
@@ -247,6 +247,11 @@ linters-settings:
|
||||
require-explanation: true
|
||||
# Enable to require nolint directives to mention the specific linter being suppressed. Default is false.
|
||||
require-specific: true
|
||||
revive:
|
||||
rules:
|
||||
- name: unexported-return
|
||||
disabled: true
|
||||
|
||||
rowserrcheck:
|
||||
packages:
|
||||
- github.com/jmoiron/sqlx
|
||||
@@ -303,6 +308,7 @@ linters:
|
||||
- misspell
|
||||
- staticcheck
|
||||
- stylecheck
|
||||
- revive
|
||||
- typecheck
|
||||
- unparam
|
||||
- unused
|
||||
|
||||
@@ -47,11 +47,11 @@ type Policies struct {
|
||||
|
||||
func unmarshalResourcePolicies(yamlData *string) (*resourcePolicies, error) {
|
||||
resPolicies := &resourcePolicies{}
|
||||
if err := decodeStruct(strings.NewReader(*yamlData), resPolicies); err != nil {
|
||||
err := decodeStruct(strings.NewReader(*yamlData), resPolicies)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode yaml data into resource policies %v", err)
|
||||
} else {
|
||||
return resPolicies, nil
|
||||
}
|
||||
return resPolicies, nil
|
||||
}
|
||||
|
||||
func (p *Policies) buildPolicy(resPolicies *resourcePolicies) error {
|
||||
|
||||
@@ -152,21 +152,22 @@ func parseCapacity(cap string) (*capacity, error) {
|
||||
var quantities []resource.Quantity
|
||||
if len(capacities) != 2 {
|
||||
return nil, fmt.Errorf("wrong format of Capacity %v", cap)
|
||||
} else {
|
||||
for _, v := range capacities {
|
||||
if strings.TrimSpace(v) == "" {
|
||||
// case similar "10Gi,"
|
||||
// if empty, the quantity will assigned with 0
|
||||
quantities = append(quantities, *resource.NewQuantity(int64(0), resource.DecimalSI))
|
||||
} else {
|
||||
if quantity, err := resource.ParseQuantity(strings.TrimSpace(v)); err != nil {
|
||||
return nil, fmt.Errorf("wrong format of Capacity %v with err %v", v, err)
|
||||
} else {
|
||||
quantities = append(quantities, quantity)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range capacities {
|
||||
if strings.TrimSpace(v) == "" {
|
||||
// case similar "10Gi,"
|
||||
// if empty, the quantity will assigned with 0
|
||||
quantities = append(quantities, *resource.NewQuantity(int64(0), resource.DecimalSI))
|
||||
} else {
|
||||
quantity, err := resource.ParseQuantity(strings.TrimSpace(v))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("wrong format of Capacity %v with err %v", v, err)
|
||||
}
|
||||
quantities = append(quantities, quantity)
|
||||
}
|
||||
}
|
||||
|
||||
return &capacity{lower: quantities[0], upper: quantities[1]}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -40,9 +40,8 @@ func GetVersionedItemFilePath(rootDir, groupResource, namespace, name, versionPa
|
||||
func GetScopeDir(namespace string) string {
|
||||
if namespace == "" {
|
||||
return velerov1api.ClusterScopedDir
|
||||
} else {
|
||||
return velerov1api.NamespaceScopedDir
|
||||
}
|
||||
return velerov1api.NamespaceScopedDir
|
||||
}
|
||||
|
||||
// Unmarshal reads the specified file, unmarshals the JSON contained within it
|
||||
|
||||
@@ -349,7 +349,7 @@ func (ib *itemBackupper) executeActions(
|
||||
// ignore during finalize phase
|
||||
if operationID != "" {
|
||||
if finalize {
|
||||
return nil, itemFiles, errors.New(fmt.Sprintf("Backup Item Action created operation during finalize (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name))
|
||||
return nil, itemFiles, fmt.Errorf("backup Item Action created operation during finalize (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
|
||||
}
|
||||
resourceIdentifier := velero.ResourceIdentifier{
|
||||
GroupResource: groupResource,
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package client
|
||||
|
||||
// Make sure we import the client-go auth provider plugins.
|
||||
|
||||
import (
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
|
||||
)
|
||||
@@ -40,8 +40,8 @@ import (
|
||||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
)
|
||||
|
||||
// InstallOptions collects all the options for installing Velero into a Kubernetes cluster.
|
||||
type InstallOptions struct {
|
||||
// Options collects all the options for installing Velero into a Kubernetes cluster.
|
||||
type Options struct {
|
||||
Namespace string
|
||||
Image string
|
||||
BucketName string
|
||||
@@ -82,7 +82,7 @@ type InstallOptions struct {
|
||||
}
|
||||
|
||||
// BindFlags adds command line values to the options struct.
|
||||
func (o *InstallOptions) BindFlags(flags *pflag.FlagSet) {
|
||||
func (o *Options) BindFlags(flags *pflag.FlagSet) {
|
||||
flags.StringVar(&o.ProviderName, "provider", o.ProviderName, "Provider name for backup and volume storage")
|
||||
flags.StringVar(&o.BucketName, "bucket", o.BucketName, "Name of the object storage bucket where backups should be stored")
|
||||
flags.StringVar(&o.SecretFile, "secret-file", o.SecretFile, "File containing credentials for backup and volume provider. If not specified, --no-secret must be used for confirmation. Optional.")
|
||||
@@ -121,8 +121,8 @@ func (o *InstallOptions) BindFlags(flags *pflag.FlagSet) {
|
||||
}
|
||||
|
||||
// NewInstallOptions instantiates a new, default InstallOptions struct.
|
||||
func NewInstallOptions() *InstallOptions {
|
||||
return &InstallOptions{
|
||||
func NewInstallOptions() *Options {
|
||||
return &Options{
|
||||
Namespace: velerov1api.DefaultNamespace,
|
||||
Image: velero.DefaultVeleroImage(),
|
||||
BackupStorageConfig: flag.NewMap(),
|
||||
@@ -148,7 +148,7 @@ func NewInstallOptions() *InstallOptions {
|
||||
}
|
||||
|
||||
// AsVeleroOptions translates the values provided at the command line into values used to instantiate Kubernetes resources
|
||||
func (o *InstallOptions) AsVeleroOptions() (*install.VeleroOptions, error) {
|
||||
func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
|
||||
var secretData []byte
|
||||
if o.SecretFile != "" && !o.NoSecret {
|
||||
realPath, err := filepath.Abs(o.SecretFile)
|
||||
@@ -264,7 +264,7 @@ This is useful as a starting point for more customized installations.
|
||||
}
|
||||
|
||||
// Run executes a command in the context of the provided arguments.
|
||||
func (o *InstallOptions) Run(c *cobra.Command, f client.Factory) error {
|
||||
func (o *Options) Run(c *cobra.Command, f client.Factory) error {
|
||||
var resources *unstructured.UnstructuredList
|
||||
if o.CRDsOnly {
|
||||
resources = install.AllCRDs()
|
||||
@@ -327,13 +327,13 @@ func (o *InstallOptions) Run(c *cobra.Command, f client.Factory) error {
|
||||
}
|
||||
|
||||
// Complete completes options for a command.
|
||||
func (o *InstallOptions) Complete(args []string, f client.Factory) error {
|
||||
func (o *Options) Complete(args []string, f client.Factory) error {
|
||||
o.Namespace = f.Namespace()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate validates options provided to a command.
|
||||
func (o *InstallOptions) Validate(c *cobra.Command, args []string, f client.Factory) error {
|
||||
func (o *Options) Validate(c *cobra.Command, args []string, f client.Factory) error {
|
||||
if err := output.ValidateFlags(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -311,7 +311,7 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := controller.UpdatePVBStatusToFailed(client, s.ctx, &pvbs.Items[i],
|
||||
if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i],
|
||||
fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
|
||||
time.Now()); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName())
|
||||
@@ -347,7 +347,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := controller.UpdatePVRStatusToFailed(client, s.ctx, &pvrs.Items[i],
|
||||
if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i],
|
||||
fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed),
|
||||
time.Now()); err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName())
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
)
|
||||
|
||||
type ServerStatusGetter interface {
|
||||
type Getter interface {
|
||||
GetServerStatus(kbClient kbclient.Client) (*velerov1api.ServerStatusRequest, error)
|
||||
}
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ func NewCommand(f client.Factory) *cobra.Command {
|
||||
return c
|
||||
}
|
||||
|
||||
func printVersion(w io.Writer, clientOnly bool, kbClient kbclient.Client, serverStatusGetter serverstatus.ServerStatusGetter) {
|
||||
func printVersion(w io.Writer, clientOnly bool, kbClient kbclient.Client, serverStatusGetter serverstatus.Getter) {
|
||||
fmt.Fprintln(w, "Client:")
|
||||
fmt.Fprintf(w, "\tVersion: %s\n", buildinfo.Version)
|
||||
fmt.Fprintf(w, "\tGit commit: %s\n", buildinfo.FormattedGitSHA())
|
||||
|
||||
@@ -253,7 +253,7 @@ type server struct {
|
||||
pluginRegistry process.Registry
|
||||
repoManager repository.Manager
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.RepositoryEnsurer
|
||||
repoEnsurer *repository.Ensurer
|
||||
metrics *metrics.ServerMetrics
|
||||
config serverConfig
|
||||
mgr manager.Manager
|
||||
@@ -470,7 +470,7 @@ func (s *server) veleroResourcesExist() error {
|
||||
}
|
||||
|
||||
if veleroGroupVersion == nil {
|
||||
return errors.Errorf("Velero API group %s not found. Apply examples/common/00-prereqs.yaml to create it.", velerov1api.SchemeGroupVersion)
|
||||
return fmt.Errorf("velero API group %s not found. Apply examples/common/00-prereqs.yaml to create it", velerov1api.SchemeGroupVersion)
|
||||
}
|
||||
|
||||
foundResources := sets.NewString()
|
||||
@@ -558,7 +558,7 @@ var defaultRestorePriorities = restore.Priorities{
|
||||
|
||||
func (s *server) checkNodeAgent() {
|
||||
// warn if node agent does not exist
|
||||
if err := nodeagent.IsRunning(s.ctx, s.kubeClient, s.namespace); err == nodeagent.DaemonsetNotFound {
|
||||
if err := nodeagent.IsRunning(s.ctx, s.kubeClient, s.namespace); err == nodeagent.ErrDaemonSetNotFound {
|
||||
s.logger.Warn("Velero node agent not found; pod volume backups/restores will not work until it's created")
|
||||
} else if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of velero node agent")
|
||||
@@ -567,7 +567,7 @@ func (s *server) checkNodeAgent() {
|
||||
|
||||
func (s *server) initRepoManager() error {
|
||||
// warn if node agent does not exist
|
||||
if err := nodeagent.IsRunning(s.ctx, s.kubeClient, s.namespace); err == nodeagent.DaemonsetNotFound {
|
||||
if err := nodeagent.IsRunning(s.ctx, s.kubeClient, s.namespace); err == nodeagent.ErrDaemonSetNotFound {
|
||||
s.logger.Warn("Velero node agent not found; pod volume backups/restores will not work until it's created")
|
||||
} else if err != nil {
|
||||
s.logger.WithError(errors.WithStack(err)).Warn("Error checking for existence of velero node agent")
|
||||
@@ -579,7 +579,7 @@ func (s *server) initRepoManager() error {
|
||||
}
|
||||
|
||||
s.repoLocker = repository.NewRepoLocker()
|
||||
s.repoEnsurer = repository.NewRepositoryEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
|
||||
s.repoEnsurer = repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
|
||||
|
||||
s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.logger)
|
||||
|
||||
@@ -939,8 +939,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
|
||||
if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok {
|
||||
if err := controller.NewServerStatusRequestReconciler(
|
||||
s.mgr.GetClient(),
|
||||
s.ctx,
|
||||
s.mgr.GetClient(),
|
||||
s.pluginRegistry,
|
||||
clock.RealClock{},
|
||||
s.logger,
|
||||
|
||||
@@ -838,47 +838,47 @@ func persistBackup(backup *pkgbackup.Request,
|
||||
persistErrs := []error{}
|
||||
backupJSON := new(bytes.Buffer)
|
||||
|
||||
if err := encode.EncodeTo(backup.Backup, "json", backupJSON); err != nil {
|
||||
if err := encode.To(backup.Backup, "json", backupJSON); err != nil {
|
||||
persistErrs = append(persistErrs, errors.Wrap(err, "error encoding backup"))
|
||||
}
|
||||
|
||||
// Velero-native volume snapshots (as opposed to CSI ones)
|
||||
nativeVolumeSnapshots, errs := encode.EncodeToJSONGzip(backup.VolumeSnapshots, "native volumesnapshots list")
|
||||
nativeVolumeSnapshots, errs := encode.ToJSONGzip(backup.VolumeSnapshots, "native volumesnapshots list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
var backupItemOperations *bytes.Buffer
|
||||
backupItemOperations, errs = encode.EncodeToJSONGzip(backup.GetItemOperationsList(), "backup item operations list")
|
||||
backupItemOperations, errs = encode.ToJSONGzip(backup.GetItemOperationsList(), "backup item operations list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
podVolumeBackups, errs := encode.EncodeToJSONGzip(backup.PodVolumeBackups, "pod volume backups list")
|
||||
podVolumeBackups, errs := encode.ToJSONGzip(backup.PodVolumeBackups, "pod volume backups list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
csiSnapshotJSON, errs := encode.EncodeToJSONGzip(csiVolumeSnapshots, "csi volume snapshots list")
|
||||
csiSnapshotJSON, errs := encode.ToJSONGzip(csiVolumeSnapshots, "csi volume snapshots list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
csiSnapshotContentsJSON, errs := encode.EncodeToJSONGzip(csiVolumeSnapshotContents, "csi volume snapshot contents list")
|
||||
csiSnapshotContentsJSON, errs := encode.ToJSONGzip(csiVolumeSnapshotContents, "csi volume snapshot contents list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
csiSnapshotClassesJSON, errs := encode.EncodeToJSONGzip(csiVolumesnapshotClasses, "csi volume snapshot classes list")
|
||||
csiSnapshotClassesJSON, errs := encode.ToJSONGzip(csiVolumesnapshotClasses, "csi volume snapshot classes list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
backupResourceList, errs := encode.EncodeToJSONGzip(backup.BackupResourceList(), "backup resources list")
|
||||
backupResourceList, errs := encode.ToJSONGzip(backup.BackupResourceList(), "backup resources list")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
backupResult, errs := encode.EncodeToJSONGzip(results, "backup results")
|
||||
backupResult, errs := encode.ToJSONGzip(results, "backup results")
|
||||
if errs != nil {
|
||||
persistErrs = append(persistErrs, errs...)
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
|
||||
// update backup metadata in object store
|
||||
backupJSON := new(bytes.Buffer)
|
||||
if err := encode.EncodeTo(backup, "json", backupJSON); err != nil {
|
||||
if err := encode.To(backup, "json", backupJSON); err != nil {
|
||||
return ctrl.Result{}, errors.Wrap(err, "error encoding backup json")
|
||||
}
|
||||
err = backupStore.PutBackupMetadata(backup.Name, backupJSON)
|
||||
|
||||
@@ -243,7 +243,7 @@ func (c *backupOperationsReconciler) updateBackupAndOperationsJSON(
|
||||
// update file store
|
||||
if backupStore != nil {
|
||||
backupJSON := new(bytes.Buffer)
|
||||
if err := encode.EncodeTo(backup, "json", backupJSON); err != nil {
|
||||
if err := encode.To(backup, "json", backupJSON); err != nil {
|
||||
removeIfComplete = false
|
||||
return errors.Wrap(err, "error encoding backup json")
|
||||
}
|
||||
|
||||
@@ -251,17 +251,17 @@ func (r *BackupRepoReconciler) getRepositoryMaintenanceFrequency(req *velerov1ap
|
||||
if r.maintenanceFrequency > 0 {
|
||||
r.logger.WithField("frequency", r.maintenanceFrequency).Info("Set user defined maintenance frequency")
|
||||
return r.maintenanceFrequency
|
||||
} else {
|
||||
frequency, err := r.repositoryManager.DefaultMaintenanceFrequency(req)
|
||||
if err != nil || frequency <= 0 {
|
||||
r.logger.WithError(err).WithField("returned frequency", frequency).Warn("Failed to get maitanance frequency, use the default one")
|
||||
frequency = defaultMaintainFrequency
|
||||
} else {
|
||||
r.logger.WithField("frequency", frequency).Info("Set matainenance according to repository suggestion")
|
||||
}
|
||||
|
||||
return frequency
|
||||
}
|
||||
|
||||
frequency, err := r.repositoryManager.DefaultMaintenanceFrequency(req)
|
||||
if err != nil || frequency <= 0 {
|
||||
r.logger.WithError(err).WithField("returned frequency", frequency).Warn("Failed to get maitanance frequency, use the default one")
|
||||
frequency = defaultMaintainFrequency
|
||||
} else {
|
||||
r.logger.WithField("frequency", frequency).Info("Set matainenance according to repository suggestion")
|
||||
}
|
||||
|
||||
return frequency
|
||||
}
|
||||
|
||||
// ensureRepo calls repo manager's PrepareRepo to ensure the repo is ready for use.
|
||||
|
||||
@@ -183,7 +183,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
}
|
||||
}()
|
||||
|
||||
snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(&pvb, log, ctx))
|
||||
snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(ctx, &pvb, log))
|
||||
if err != nil {
|
||||
return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running backup, stderr=%v", err), log)
|
||||
}
|
||||
@@ -280,14 +280,14 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l
|
||||
}
|
||||
|
||||
func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
|
||||
if err = UpdatePVBStatusToFailed(r.Client, ctx, pvb, errors.WithMessage(err, msg).Error(), r.Clock.Now()); err != nil {
|
||||
if err = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.Clock.Now()); err != nil {
|
||||
log.WithError(err).Error("error updating PodVolumeBackup status")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func UpdatePVBStatusToFailed(c client.Client, ctx context.Context, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time) error {
|
||||
func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time) error {
|
||||
original := pvb.DeepCopy()
|
||||
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
|
||||
pvb.Status.Message = errString
|
||||
@@ -296,12 +296,12 @@ func UpdatePVBStatusToFailed(c client.Client, ctx context.Context, pvb *velerov1
|
||||
return c.Patch(ctx, pvb, client.MergeFrom(original))
|
||||
}
|
||||
|
||||
func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger, ctx context.Context) *BackupProgressUpdater {
|
||||
func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(ctx context.Context, pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) *BackupProgressUpdater {
|
||||
return &BackupProgressUpdater{pvb, log, ctx, r.Client}
|
||||
}
|
||||
|
||||
// UpdateProgress which implement ProgressUpdater interface to update pvb progress status
|
||||
func (b *BackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {
|
||||
func (b *BackupProgressUpdater) UpdateProgress(p *uploader.Progress) {
|
||||
original := b.PodVolumeBackup.DeepCopy()
|
||||
b.PodVolumeBackup.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if b.Cli == nil {
|
||||
|
||||
@@ -122,7 +122,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
||||
}
|
||||
|
||||
if err = c.processRestore(ctx, pvr, pod, log); err != nil {
|
||||
if e := UpdatePVRStatusToFailed(c, ctx, pvr, err.Error(), c.clock.Now()); e != nil {
|
||||
if e := UpdatePVRStatusToFailed(ctx, c, pvr, err.Error(), c.clock.Now()); e != nil {
|
||||
log.WithError(err).Error("Unable to update status to failed")
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func UpdatePVRStatusToFailed(c client.Client, ctx context.Context, pvr *velerov1api.PodVolumeRestore, errString string, time time.Time) error {
|
||||
func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1api.PodVolumeRestore, errString string, time time.Time) error {
|
||||
original := pvr.DeepCopy()
|
||||
pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed
|
||||
pvr.Status.Message = errString
|
||||
@@ -278,7 +278,7 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
|
||||
}
|
||||
}()
|
||||
|
||||
if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(req, log, ctx)); err != nil {
|
||||
if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(ctx, req, log)); err != nil {
|
||||
return errors.Wrapf(err, "error running restore err=%v", err)
|
||||
}
|
||||
|
||||
@@ -314,12 +314,12 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger, ctx context.Context) *RestoreProgressUpdater {
|
||||
func (c *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) *RestoreProgressUpdater {
|
||||
return &RestoreProgressUpdater{pvr, log, ctx, c.Client}
|
||||
}
|
||||
|
||||
// UpdateProgress which implement ProgressUpdater interface to update pvr progress status
|
||||
func (c *RestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {
|
||||
func (c *RestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {
|
||||
original := c.PodVolumeRestore.DeepCopy()
|
||||
c.PodVolumeRestore.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}
|
||||
if c.Cli == nil {
|
||||
|
||||
@@ -58,8 +58,8 @@ type serverStatusRequestReconciler struct {
|
||||
|
||||
// NewServerStatusRequestReconciler initializes and returns serverStatusRequestReconciler struct.
|
||||
func NewServerStatusRequestReconciler(
|
||||
client client.Client,
|
||||
ctx context.Context,
|
||||
client client.Client,
|
||||
pluginRegistry PluginLister,
|
||||
clock clocks.WithTickerAndDelayedExecution,
|
||||
log logrus.FieldLogger) *serverStatusRequestReconciler {
|
||||
|
||||
@@ -63,8 +63,8 @@ var _ = Describe("Server Status Request Reconciler", func() {
|
||||
// Setup reconciler
|
||||
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
|
||||
r := NewServerStatusRequestReconciler(
|
||||
fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(test.req).Build(),
|
||||
context.Background(),
|
||||
fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(test.req).Build(),
|
||||
test.reqPluginLister,
|
||||
testclocks.NewFakeClock(now),
|
||||
velerotest.NewLogger(),
|
||||
|
||||
@@ -200,9 +200,8 @@ func DeploymentIsReady(factory client.DynamicFactory, namespace string) (bool, e
|
||||
if readyObservations > 4 {
|
||||
isReady = true
|
||||
return true, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
return isReady, err
|
||||
}
|
||||
@@ -247,9 +246,8 @@ func DaemonSetIsReady(factory client.DynamicFactory, namespace string) (bool, er
|
||||
if readyObservations > 4 {
|
||||
isReady = true
|
||||
return true, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
return isReady, err
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ func (m *OperationsForBackup) DeepCopyInto(out *OperationsForBackup) {
|
||||
func (m *OperationsForBackup) uploadProgress(backupStore persistence.BackupStore, backupName string) error {
|
||||
if len(m.Operations) > 0 {
|
||||
var backupItemOperations *bytes.Buffer
|
||||
backupItemOperations, errs := encode.EncodeToJSONGzip(m.Operations, "backup item operations list")
|
||||
backupItemOperations, errs := encode.ToJSONGzip(m.Operations, "backup item operations list")
|
||||
if errs != nil {
|
||||
return errors.Wrap(errs[0], "error encoding item operations json")
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ func (m *OperationsForRestore) DeepCopyInto(out *OperationsForRestore) {
|
||||
func (m *OperationsForRestore) uploadProgress(backupStore persistence.BackupStore, restoreName string) error {
|
||||
if len(m.Operations) > 0 {
|
||||
var restoreItemOperations *bytes.Buffer
|
||||
restoreItemOperations, errs := encode.EncodeToJSONGzip(m.Operations, "restore item operations list")
|
||||
restoreItemOperations, errs := encode.ToJSONGzip(m.Operations, "restore item operations list")
|
||||
if errs != nil {
|
||||
return errors.Wrap(errs[0], "error encoding item operations json")
|
||||
}
|
||||
|
||||
@@ -36,13 +36,13 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
DaemonsetNotFound = errors.New("daemonset not found")
|
||||
ErrDaemonSetNotFound = errors.New("daemonset not found")
|
||||
)
|
||||
|
||||
// IsRunning checks if the node agent daemonset is running properly. If not, return the error found
|
||||
func IsRunning(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error {
|
||||
if _, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
|
||||
return DaemonsetNotFound
|
||||
return ErrDaemonSetNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else {
|
||||
|
||||
@@ -48,7 +48,7 @@ type Backupper interface {
|
||||
type backupper struct {
|
||||
ctx context.Context
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.RepositoryEnsurer
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
pvClient corev1client.PersistentVolumesGetter
|
||||
@@ -62,7 +62,7 @@ type backupper struct {
|
||||
func newBackupper(
|
||||
ctx context.Context,
|
||||
repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.RepositoryEnsurer,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
podVolumeBackupInformer cache.SharedIndexInformer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
|
||||
@@ -40,7 +40,7 @@ type BackupperFactory interface {
|
||||
|
||||
func NewBackupperFactory(
|
||||
repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.RepositoryEnsurer,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
pvClient corev1client.PersistentVolumesGetter,
|
||||
@@ -60,7 +60,7 @@ func NewBackupperFactory(
|
||||
|
||||
type backupperFactory struct {
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.RepositoryEnsurer
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
pvClient corev1client.PersistentVolumesGetter
|
||||
|
||||
@@ -55,7 +55,7 @@ type Restorer interface {
|
||||
type restorer struct {
|
||||
ctx context.Context
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.RepositoryEnsurer
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
podClient corev1client.PodsGetter
|
||||
@@ -70,7 +70,7 @@ type restorer struct {
|
||||
func newRestorer(
|
||||
ctx context.Context,
|
||||
repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.RepositoryEnsurer,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
podVolumeRestoreInformer cache.SharedIndexInformer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
@@ -194,9 +194,8 @@ func (r *restorer) RestorePodVolumes(data RestoreData) []error {
|
||||
err = kube.IsPodScheduled(newObj)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
} else {
|
||||
return true, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
err := wait.PollWithContext(checkCtx, time.Millisecond*500, time.Minute*10, checkFunc)
|
||||
|
||||
@@ -40,7 +40,7 @@ type RestorerFactory interface {
|
||||
}
|
||||
|
||||
func NewRestorerFactory(repoLocker *repository.RepoLocker,
|
||||
repoEnsurer *repository.RepositoryEnsurer,
|
||||
repoEnsurer *repository.Ensurer,
|
||||
veleroClient clientset.Interface,
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter,
|
||||
podClient corev1client.PodsGetter,
|
||||
@@ -59,7 +59,7 @@ func NewRestorerFactory(repoLocker *repository.RepoLocker,
|
||||
|
||||
type restorerFactory struct {
|
||||
repoLocker *repository.RepoLocker
|
||||
repoEnsurer *repository.RepositoryEnsurer
|
||||
repoEnsurer *repository.Ensurer
|
||||
veleroClient clientset.Interface
|
||||
pvcClient corev1client.PersistentVolumeClaimsGetter
|
||||
podClient corev1client.PodsGetter
|
||||
|
||||
@@ -30,8 +30,8 @@ import (
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
)
|
||||
|
||||
// RepositoryEnsurer ensures that backup repositories are created and ready.
|
||||
type RepositoryEnsurer struct {
|
||||
// Ensurer ensures that backup repositories are created and ready.
|
||||
type Ensurer struct {
|
||||
log logrus.FieldLogger
|
||||
repoClient client.Client
|
||||
|
||||
@@ -42,8 +42,8 @@ type RepositoryEnsurer struct {
|
||||
resourceTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewRepositoryEnsurer(repoClient client.Client, log logrus.FieldLogger, resourceTimeout time.Duration) *RepositoryEnsurer {
|
||||
return &RepositoryEnsurer{
|
||||
func NewEnsurer(repoClient client.Client, log logrus.FieldLogger, resourceTimeout time.Duration) *Ensurer {
|
||||
return &Ensurer{
|
||||
log: log,
|
||||
repoClient: repoClient,
|
||||
repoLocks: make(map[BackupRepositoryKey]*sync.Mutex),
|
||||
@@ -51,7 +51,7 @@ func NewRepositoryEnsurer(repoClient client.Client, log logrus.FieldLogger, reso
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation, repositoryType string) (*velerov1api.BackupRepository, error) {
|
||||
func (r *Ensurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation, repositoryType string) (*velerov1api.BackupRepository, error) {
|
||||
if volumeNamespace == "" || backupLocation == "" || repositoryType == "" {
|
||||
return nil, errors.Errorf("wrong parameters, namespace %q, backup storage location %q, repository type %q", volumeNamespace, backupLocation, repositoryType)
|
||||
}
|
||||
@@ -96,7 +96,7 @@ func (r *RepositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
|
||||
return r.createBackupRepositoryAndWait(ctx, namespace, backupRepoKey)
|
||||
}
|
||||
|
||||
func (r *RepositoryEnsurer) repoLock(key BackupRepositoryKey) *sync.Mutex {
|
||||
func (r *Ensurer) repoLock(key BackupRepositoryKey) *sync.Mutex {
|
||||
r.repoLocksMu.Lock()
|
||||
defer r.repoLocksMu.Unlock()
|
||||
|
||||
@@ -107,7 +107,7 @@ func (r *RepositoryEnsurer) repoLock(key BackupRepositoryKey) *sync.Mutex {
|
||||
return r.repoLocks[key]
|
||||
}
|
||||
|
||||
func (r *RepositoryEnsurer) createBackupRepositoryAndWait(ctx context.Context, namespace string, backupRepoKey BackupRepositoryKey) (*velerov1api.BackupRepository, error) {
|
||||
func (r *Ensurer) createBackupRepositoryAndWait(ctx context.Context, namespace string, backupRepoKey BackupRepositoryKey) (*velerov1api.BackupRepository, error) {
|
||||
toCreate := newBackupRepository(namespace, backupRepoKey)
|
||||
if err := r.repoClient.Create(ctx, toCreate, &client.CreateOptions{}); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create backup repository resource")
|
||||
@@ -129,7 +129,7 @@ func (r *RepositoryEnsurer) createBackupRepositoryAndWait(ctx context.Context, n
|
||||
err := wait.PollWithContext(ctx, time.Millisecond*500, r.resourceTimeout, checkFunc)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to wait BackupRepository")
|
||||
} else {
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ type manager struct {
|
||||
providers map[string]provider.Provider
|
||||
client client.Client
|
||||
repoLocker *RepoLocker
|
||||
repoEnsurer *RepositoryEnsurer
|
||||
repoEnsurer *Ensurer
|
||||
fileSystem filesystem.Interface
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
@@ -91,7 +91,7 @@ func NewManager(
|
||||
namespace string,
|
||||
client client.Client,
|
||||
repoLocker *RepoLocker,
|
||||
repoEnsurer *RepositoryEnsurer,
|
||||
repoEnsurer *Ensurer,
|
||||
credentialFileStore credentials.FileStore,
|
||||
credentialSecretStore credentials.SecretStore,
|
||||
log logrus.FieldLogger,
|
||||
|
||||
@@ -104,7 +104,7 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.Prog
|
||||
// if the line contains a non-empty bytes_done field, we can update the
|
||||
// caller with the progress
|
||||
if stat.BytesDone != 0 {
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
updater.UpdateProgress(&uploader.Progress{
|
||||
TotalBytes: stat.TotalBytes,
|
||||
BytesDone: stat.BytesDone,
|
||||
})
|
||||
@@ -136,7 +136,7 @@ func RunBackup(backupCmd *Command, log logrus.FieldLogger, updater uploader.Prog
|
||||
}
|
||||
|
||||
// update progress to 100%
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
updater.UpdateProgress(&uploader.Progress{
|
||||
TotalBytes: stat.TotalBytesProcessed,
|
||||
BytesDone: stat.TotalBytesProcessed,
|
||||
})
|
||||
@@ -198,7 +198,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.Pr
|
||||
return "", "", errors.Wrap(err, "error getting snapshot size")
|
||||
}
|
||||
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
updater.UpdateProgress(&uploader.Progress{
|
||||
TotalBytes: snapshotSize,
|
||||
})
|
||||
|
||||
@@ -217,7 +217,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.Pr
|
||||
}
|
||||
|
||||
if volumeSize != 0 {
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
updater.UpdateProgress(&uploader.Progress{
|
||||
TotalBytes: snapshotSize,
|
||||
BytesDone: volumeSize,
|
||||
})
|
||||
@@ -233,7 +233,7 @@ func RunRestore(restoreCmd *Command, log logrus.FieldLogger, updater uploader.Pr
|
||||
quit <- struct{}{}
|
||||
|
||||
// update progress to 100%
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
updater.UpdateProgress(&uploader.Progress{
|
||||
TotalBytes: snapshotSize,
|
||||
BytesDone: snapshotSize,
|
||||
})
|
||||
|
||||
@@ -41,13 +41,13 @@ func (t *Throttle) ShouldOutput() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *KopiaProgress) InitThrottle(interval time.Duration) {
|
||||
func (p *Progress) InitThrottle(interval time.Duration) {
|
||||
p.outputThrottle.throttle = 0
|
||||
p.outputThrottle.interval = interval
|
||||
}
|
||||
|
||||
// KopiaProgress represents a backup or restore counters.
|
||||
type KopiaProgress struct {
|
||||
// Progress represents a backup or restore counters.
|
||||
type Progress struct {
|
||||
// all int64 must precede all int32 due to alignment requirements on ARM
|
||||
// +checkatomic
|
||||
uploadedBytes int64 //the total bytes has uploaded
|
||||
@@ -69,7 +69,7 @@ type KopiaProgress struct {
|
||||
}
|
||||
|
||||
// UploadedBytes the total bytes has uploaded currently
|
||||
func (p *KopiaProgress) UploadedBytes(numBytes int64) {
|
||||
func (p *Progress) UploadedBytes(numBytes int64) {
|
||||
atomic.AddInt64(&p.uploadedBytes, numBytes)
|
||||
atomic.AddInt32(&p.uploadedFiles, 1)
|
||||
|
||||
@@ -77,7 +77,7 @@ func (p *KopiaProgress) UploadedBytes(numBytes int64) {
|
||||
}
|
||||
|
||||
// Error statistic the total Error has occurred
|
||||
func (p *KopiaProgress) Error(path string, err error, isIgnored bool) {
|
||||
func (p *Progress) Error(path string, err error, isIgnored bool) {
|
||||
if isIgnored {
|
||||
atomic.AddInt32(&p.ignoredErrorCount, 1)
|
||||
p.Log.Warnf("Ignored error when processing %v: %v", path, err)
|
||||
@@ -88,7 +88,7 @@ func (p *KopiaProgress) Error(path string, err error, isIgnored bool) {
|
||||
}
|
||||
|
||||
// EstimatedDataSize statistic the total size of files to be processed and total files to be processed
|
||||
func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
|
||||
func (p *Progress) EstimatedDataSize(fileCount int, totalBytes int64) {
|
||||
atomic.StoreInt64(&p.estimatedTotalBytes, totalBytes)
|
||||
atomic.StoreInt32(&p.estimatedFileCount, int32(fileCount))
|
||||
|
||||
@@ -96,57 +96,57 @@ func (p *KopiaProgress) EstimatedDataSize(fileCount int, totalBytes int64) {
|
||||
}
|
||||
|
||||
// UpdateProgress which calls Updater UpdateProgress interface, update progress by third-party implementation
|
||||
func (p *KopiaProgress) UpdateProgress() {
|
||||
func (p *Progress) UpdateProgress() {
|
||||
if p.outputThrottle.ShouldOutput() {
|
||||
p.Updater.UpdateProgress(&uploader.UploaderProgress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes})
|
||||
p.Updater.UpdateProgress(&uploader.Progress{TotalBytes: p.estimatedTotalBytes, BytesDone: p.processedBytes})
|
||||
}
|
||||
}
|
||||
|
||||
// UploadStarted statistic the total Error has occurred
|
||||
func (p *KopiaProgress) UploadStarted() {}
|
||||
func (p *Progress) UploadStarted() {}
|
||||
|
||||
// CachedFile statistic the total bytes been cached currently
|
||||
func (p *KopiaProgress) CachedFile(fname string, numBytes int64) {
|
||||
func (p *Progress) CachedFile(fname string, numBytes int64) {
|
||||
atomic.AddInt64(&p.cachedBytes, numBytes)
|
||||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
// HashedBytes statistic the total bytes been hashed currently
|
||||
func (p *KopiaProgress) HashedBytes(numBytes int64) {
|
||||
func (p *Progress) HashedBytes(numBytes int64) {
|
||||
atomic.AddInt64(&p.processedBytes, numBytes)
|
||||
atomic.AddInt64(&p.hashededBytes, numBytes)
|
||||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
// HashingFile statistic the file been hashed currently
|
||||
func (p *KopiaProgress) HashingFile(fname string) {}
|
||||
func (p *Progress) HashingFile(fname string) {}
|
||||
|
||||
// ExcludedFile statistic the file been excluded currently
|
||||
func (p *KopiaProgress) ExcludedFile(fname string, numBytes int64) {}
|
||||
func (p *Progress) ExcludedFile(fname string, numBytes int64) {}
|
||||
|
||||
// ExcludedDir statistic the dir been excluded currently
|
||||
func (p *KopiaProgress) ExcludedDir(dirname string) {}
|
||||
func (p *Progress) ExcludedDir(dirname string) {}
|
||||
|
||||
// FinishedHashingFile which will called when specific file finished hash
|
||||
func (p *KopiaProgress) FinishedHashingFile(fname string, numBytes int64) {
|
||||
func (p *Progress) FinishedHashingFile(fname string, numBytes int64) {
|
||||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
// StartedDirectory called when begin to upload one directory
|
||||
func (p *KopiaProgress) StartedDirectory(dirname string) {}
|
||||
func (p *Progress) StartedDirectory(dirname string) {}
|
||||
|
||||
// FinishedDirectory called when finish to upload one directory
|
||||
func (p *KopiaProgress) FinishedDirectory(dirname string) {
|
||||
func (p *Progress) FinishedDirectory(dirname string) {
|
||||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
// UploadFinished which report the files flushed after the Upload has completed.
|
||||
func (p *KopiaProgress) UploadFinished() {
|
||||
func (p *Progress) UploadFinished() {
|
||||
p.UpdateProgress()
|
||||
}
|
||||
|
||||
// ProgressBytes which statistic all bytes has been processed currently
|
||||
func (p *KopiaProgress) ProgressBytes(processedBytes int64, totalBytes int64) {
|
||||
func (p *Progress) ProgressBytes(processedBytes int64, totalBytes int64) {
|
||||
atomic.StoreInt64(&p.processedBytes, processedBytes)
|
||||
atomic.StoreInt64(&p.estimatedTotalBytes, totalBytes)
|
||||
p.UpdateProgress()
|
||||
|
||||
@@ -274,7 +274,7 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
|
||||
}
|
||||
|
||||
// Restore restore specific sourcePath with given snapshotID and update progress
|
||||
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
log.Info("Start to restore...")
|
||||
|
||||
kopiaCtx := logging.SetupKopiaLog(ctx, log)
|
||||
|
||||
@@ -119,7 +119,7 @@ func (kp *kopiaProvider) RunBackup(
|
||||
})
|
||||
repoWriter := kopia.NewShimRepo(kp.bkRepo)
|
||||
kpUploader := snapshotfs.NewUploader(repoWriter)
|
||||
progress := new(kopia.KopiaProgress)
|
||||
progress := new(kopia.Progress)
|
||||
progress.InitThrottle(backupProgressCheckInterval)
|
||||
progress.Updater = updater
|
||||
progress.Log = log
|
||||
@@ -144,7 +144,7 @@ func (kp *kopiaProvider) RunBackup(
|
||||
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(
|
||||
&uploader.UploaderProgress{
|
||||
&uploader.Progress{
|
||||
TotalBytes: snapshotInfo.Size,
|
||||
BytesDone: snapshotInfo.Size,
|
||||
},
|
||||
@@ -177,7 +177,7 @@ func (kp *kopiaProvider) RunRestore(
|
||||
"volumePath": volumePath,
|
||||
})
|
||||
repoWriter := kopia.NewShimRepo(kp.bkRepo)
|
||||
prorgess := new(kopia.KopiaProgress)
|
||||
prorgess := new(kopia.Progress)
|
||||
prorgess.InitThrottle(restoreProgressCheckInterval)
|
||||
prorgess.Updater = updater
|
||||
restoreCancel := make(chan struct{})
|
||||
@@ -200,7 +200,7 @@ func (kp *kopiaProvider) RunRestore(
|
||||
}
|
||||
|
||||
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
|
||||
updater.UpdateProgress(&uploader.UploaderProgress{
|
||||
updater.UpdateProgress(&uploader.Progress{
|
||||
TotalBytes: size,
|
||||
BytesDone: size,
|
||||
})
|
||||
|
||||
@@ -85,19 +85,19 @@ func TestRunRestore(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
|
||||
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
|
||||
notError bool
|
||||
}{
|
||||
{
|
||||
name: "normal restore",
|
||||
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
return 0, 0, nil
|
||||
},
|
||||
notError: true,
|
||||
},
|
||||
{
|
||||
name: "failed to restore",
|
||||
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.KopiaProgress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
|
||||
return 0, 0, errors.New("failed to restore")
|
||||
},
|
||||
notError: false,
|
||||
@@ -124,7 +124,7 @@ type FakeBackupProgressUpdater struct {
|
||||
Cli client.Client
|
||||
}
|
||||
|
||||
func (f *FakeBackupProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {}
|
||||
func (f *FakeBackupProgressUpdater) UpdateProgress(p *uploader.Progress) {}
|
||||
|
||||
type FakeRestoreProgressUpdater struct {
|
||||
PodVolumeRestore *velerov1api.PodVolumeRestore
|
||||
@@ -133,4 +133,4 @@ type FakeRestoreProgressUpdater struct {
|
||||
Cli client.Client
|
||||
}
|
||||
|
||||
func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.UploaderProgress) {}
|
||||
func (f *FakeRestoreProgressUpdater) UpdateProgress(p *uploader.Progress) {}
|
||||
|
||||
@@ -41,13 +41,13 @@ type SnapshotInfo struct {
|
||||
Size int64 `json:"Size"`
|
||||
}
|
||||
|
||||
// UploaderProgress which defined two variables to record progress
|
||||
type UploaderProgress struct {
|
||||
// Progress which defined two variables to record progress
|
||||
type Progress struct {
|
||||
TotalBytes int64 `json:"totalBytes,omitempty"`
|
||||
BytesDone int64 `json:"doneBytes,omitempty"`
|
||||
}
|
||||
|
||||
// UploaderProgress which defined generic interface to update progress
|
||||
type ProgressUpdater interface {
|
||||
UpdateProgress(p *UploaderProgress)
|
||||
UpdateProgress(p *Progress)
|
||||
}
|
||||
|
||||
@@ -35,15 +35,15 @@ import (
|
||||
func Encode(obj runtime.Object, format string) ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
if err := EncodeTo(obj, format, buf); err != nil {
|
||||
if err := To(obj, format, buf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// EncodeTo converts the provided object to the specified format and
|
||||
// To converts the provided object to the specified format and
|
||||
// writes the encoded data to the provided io.Writer.
|
||||
func EncodeTo(obj runtime.Object, format string, w io.Writer) error {
|
||||
func To(obj runtime.Object, format string, w io.Writer) error {
|
||||
encoder, err := EncoderFor(format, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -73,8 +73,8 @@ func EncoderFor(format string, obj runtime.Object) (runtime.Encoder, error) {
|
||||
return encoder, nil
|
||||
}
|
||||
|
||||
// EncodeToJSONGzip takes arbitrary Go data and encodes it to GZip compressed JSON in a buffer, as well as a description of the data to put into an error should encoding fail.
|
||||
func EncodeToJSONGzip(data interface{}, desc string) (*bytes.Buffer, []error) {
|
||||
// ToJSONGzip takes arbitrary Go data and encodes it to GZip compressed JSON in a buffer, as well as a description of the data to put into an error should encoding fail.
|
||||
func ToJSONGzip(data interface{}, desc string) (*bytes.Buffer, []error) {
|
||||
buf := new(bytes.Buffer)
|
||||
gzw := gzip.NewWriter(buf)
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
"google.golang.org/api/iterator"
|
||||
"google.golang.org/api/option"
|
||||
|
||||
. "github.com/vmware-tanzu/velero/test/e2e"
|
||||
"github.com/vmware-tanzu/velero/test/e2e"
|
||||
)
|
||||
|
||||
type GCSStorage string
|
||||
@@ -101,7 +101,7 @@ func (s GCSStorage) DeleteObjectsInBucket(cloudCredentialsFile, bslBucket, bslPr
|
||||
}
|
||||
}
|
||||
|
||||
func (s GCSStorage) IsSnapshotExisted(cloudCredentialsFile, bslConfig, backupObject string, snapshotCheck SnapshotCheckPoint) error {
|
||||
func (s GCSStorage) IsSnapshotExisted(cloudCredentialsFile, bslConfig, backupObject string, snapshotCheck e2e.SnapshotCheckPoint) error {
|
||||
ctx := context.Background()
|
||||
data, err := os.ReadFile(cloudCredentialsFile)
|
||||
if err != nil {
|
||||
|
||||
@@ -48,7 +48,7 @@ const (
|
||||
|
||||
// we provide more install options other than the standard install.InstallOptions in E2E test
|
||||
type installOptions struct {
|
||||
*install.InstallOptions
|
||||
*install.Options
|
||||
RegistryCredentialFile string
|
||||
RestoreHelperImage string
|
||||
VeleroServerDebugMode bool
|
||||
@@ -100,7 +100,7 @@ func VeleroInstall(ctx context.Context, veleroCfg *VeleroConfig) error {
|
||||
veleroInstallOptions.GarbageCollectionFrequency = GCFrequency
|
||||
|
||||
err = installVeleroServer(ctx, veleroCfg.VeleroCLI, &installOptions{
|
||||
InstallOptions: veleroInstallOptions,
|
||||
Options: veleroInstallOptions,
|
||||
RegistryCredentialFile: veleroCfg.RegistryCredentialFile,
|
||||
RestoreHelperImage: veleroCfg.RestoreHelperImage,
|
||||
VeleroServerDebugMode: veleroCfg.VeleroServerDebugMode,
|
||||
|
||||
@@ -144,7 +144,7 @@ func GetProviderPluginsByVersion(version, providerName, feature string) ([]strin
|
||||
|
||||
// getProviderVeleroInstallOptions returns Velero InstallOptions for the provider.
|
||||
func getProviderVeleroInstallOptions(veleroCfg *VeleroConfig,
|
||||
plugins []string) (*cliinstall.InstallOptions, error) {
|
||||
plugins []string) (*cliinstall.Options, error) {
|
||||
|
||||
if veleroCfg.CloudCredentialsFile == "" {
|
||||
return nil, errors.Errorf("No credentials were supplied to use for E2E tests")
|
||||
|
||||
Reference in New Issue
Block a user