Merge pull request #9357 from sseago/incremental-bytes2

Add incrementalBytes to DU/PVB for reporting new/changed size
This commit is contained in:
lyndon-li
2025-10-28 13:18:49 +08:00
committed by GitHub
30 changed files with 145 additions and 57 deletions

View File

@@ -0,0 +1 @@
Add incrementalSize to DU/PVB for reporting new/changed size

View File

@@ -33,6 +33,12 @@ spec:
jsonPath: .status.progress.totalBytes
name: Total Bytes
type: integer
- description: Incremental bytes
format: int64
jsonPath: .status.incrementalBytes
name: Incremental Bytes
priority: 10
type: integer
- description: Name of the Backup Storage Location where this backup should be
stored
jsonPath: .spec.backupStorageLocation
@@ -189,6 +195,11 @@ spec:
format: date-time
nullable: true
type: string
incrementalBytes:
description: IncrementalBytes holds the number of bytes new or changed
since the last backup
format: int64
type: integer
message:
description: Message is a message about the pod volume backup's status.
type: string

File diff suppressed because one or more lines are too long

View File

@@ -33,6 +33,12 @@ spec:
jsonPath: .status.progress.totalBytes
name: Total Bytes
type: integer
- description: Incremental bytes
format: int64
jsonPath: .status.incrementalBytes
name: Incremental Bytes
priority: 10
type: integer
- description: Name of the Backup Storage Location where this backup should be
stored
jsonPath: .spec.backupStorageLocation
@@ -173,6 +179,11 @@ spec:
as a result of the DataUpload.
nullable: true
type: object
incrementalBytes:
description: IncrementalBytes holds the number of bytes new or changed
since the last backup
format: int64
type: integer
message:
description: Message is a message about the DataUpload's status.
type: string

File diff suppressed because one or more lines are too long

View File

@@ -170,6 +170,9 @@ type SnapshotDataMovementInfo struct {
// Moved snapshot data size.
Size int64 `json:"size"`
// Moved snapshot incremental size.
IncrementalSize int64 `json:"incrementalSize,omitempty"`
// The DataUpload's Status.Phase value
Phase velerov2alpha1.DataUploadPhase
}
@@ -217,6 +220,9 @@ type PodVolumeInfo struct {
// The snapshot corresponding volume size.
Size int64 `json:"size,omitempty"`
// The incremental snapshot size.
IncrementalSize int64 `json:"incrementalSize,omitempty"`
// The type of the uploader that uploads the data. The valid values are `kopia` and `restic`.
UploaderType string `json:"uploaderType"`
@@ -240,14 +246,15 @@ type PodVolumeInfo struct {
func newPodVolumeInfoFromPVB(pvb *velerov1api.PodVolumeBackup) *PodVolumeInfo {
return &PodVolumeInfo{
SnapshotHandle: pvb.Status.SnapshotID,
Size: pvb.Status.Progress.TotalBytes,
UploaderType: pvb.Spec.UploaderType,
VolumeName: pvb.Spec.Volume,
PodName: pvb.Spec.Pod.Name,
PodNamespace: pvb.Spec.Pod.Namespace,
NodeName: pvb.Spec.Node,
Phase: pvb.Status.Phase,
SnapshotHandle: pvb.Status.SnapshotID,
Size: pvb.Status.Progress.TotalBytes,
IncrementalSize: pvb.Status.IncrementalBytes,
UploaderType: pvb.Spec.UploaderType,
VolumeName: pvb.Spec.Volume,
PodName: pvb.Spec.Pod.Name,
PodNamespace: pvb.Spec.Pod.Namespace,
NodeName: pvb.Spec.Node,
Phase: pvb.Status.Phase,
}
}

View File

@@ -118,6 +118,10 @@ type PodVolumeBackupStatus struct {
// +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// IncrementalBytes holds the number of bytes new or changed since the last backup
// +optional
IncrementalBytes int64 `json:"incrementalBytes,omitempty"`
// AcceptedTimestamp records the time the pod volume backup is to be prepared.
// The server's time is used for AcceptedTimestamp
// +optional
@@ -134,6 +138,7 @@ type PodVolumeBackupStatus struct {
// +kubebuilder:printcolumn:name="Started",type="date",JSONPath=".status.startTimestamp",description="Time duration since this PodVolumeBackup was started"
// +kubebuilder:printcolumn:name="Bytes Done",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Completed bytes"
// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Incremental Bytes",type="integer",format="int64",JSONPath=".status.incrementalBytes",description="Incremental bytes",priority=10
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where this backup should be stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this PodVolumeBackup was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the PodVolumeBackup is processed"

View File

@@ -155,6 +155,10 @@ type DataUploadStatus struct {
// +optional
Progress shared.DataMoveOperationProgress `json:"progress,omitempty"`
// IncrementalBytes holds the number of bytes new or changed since the last backup
// +optional
IncrementalBytes int64 `json:"incrementalBytes,omitempty"`
// Node is name of the node where the DataUpload is processed.
// +optional
Node string `json:"node,omitempty"`
@@ -185,6 +189,7 @@ type DataUploadStatus struct {
// +kubebuilder:printcolumn:name="Started",type="date",JSONPath=".status.startTimestamp",description="Time duration since this DataUpload was started"
// +kubebuilder:printcolumn:name="Bytes Done",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Completed bytes"
// +kubebuilder:printcolumn:name="Total Bytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Total bytes"
// +kubebuilder:printcolumn:name="Incremental Bytes",type="integer",format="int64",JSONPath=".status.incrementalBytes",description="Incremental bytes",priority=10
// +kubebuilder:printcolumn:name="Storage Location",type="string",JSONPath=".spec.backupStorageLocation",description="Name of the Backup Storage Location where this backup should be stored"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time duration since this DataUpload was created"
// +kubebuilder:printcolumn:name="Node",type="string",JSONPath=".status.node",description="Name of the node where the DataUpload is processed"

View File

@@ -1198,6 +1198,7 @@ func updateVolumeInfos(
volumeInfos[index].SnapshotDataMovementInfo.SnapshotHandle = dataUpload.Status.SnapshotID
volumeInfos[index].SnapshotDataMovementInfo.RetainedSnapshot = dataUpload.Spec.CSISnapshot.VolumeSnapshot
volumeInfos[index].SnapshotDataMovementInfo.Size = dataUpload.Status.Progress.TotalBytes
volumeInfos[index].SnapshotDataMovementInfo.IncrementalSize = dataUpload.Status.IncrementalBytes
volumeInfos[index].SnapshotDataMovementInfo.Phase = dataUpload.Status.Phase
if dataUpload.Status.Phase == velerov2alpha1.DataUploadPhaseCompleted {

View File

@@ -5578,6 +5578,7 @@ func TestUpdateVolumeInfos(t *testing.T) {
CSISnapshot(&velerov2alpha1.CSISnapshotSpec{VolumeSnapshot: "vs-1"}).
SnapshotID("snapshot-id").
Progress(shared.DataMoveOperationProgress{TotalBytes: 1000}).
IncrementalBytes(500).
Phase(velerov2alpha1.DataUploadPhaseFailed).
SourceNamespace("ns-1").
SourcePVC("pvc-1").
@@ -5603,6 +5604,7 @@ func TestUpdateVolumeInfos(t *testing.T) {
RetainedSnapshot: "vs-1",
SnapshotHandle: "snapshot-id",
Size: 1000,
IncrementalSize: 500,
Phase: velerov2alpha1.DataUploadPhaseFailed,
},
},
@@ -5616,6 +5618,7 @@ func TestUpdateVolumeInfos(t *testing.T) {
CSISnapshot(&velerov2alpha1.CSISnapshotSpec{VolumeSnapshot: "vs-1"}).
SnapshotID("snapshot-id").
Progress(shared.DataMoveOperationProgress{TotalBytes: 1000}).
IncrementalBytes(500).
Phase(velerov2alpha1.DataUploadPhaseCompleted).
SourceNamespace("ns-1").
SourcePVC("pvc-1").
@@ -5641,6 +5644,7 @@ func TestUpdateVolumeInfos(t *testing.T) {
RetainedSnapshot: "vs-1",
SnapshotHandle: "snapshot-id",
Size: 1000,
IncrementalSize: 500,
Phase: velerov2alpha1.DataUploadPhaseCompleted,
},
},
@@ -5655,6 +5659,7 @@ func TestUpdateVolumeInfos(t *testing.T) {
CSISnapshot(&velerov2alpha1.CSISnapshotSpec{VolumeSnapshot: "vs-1"}).
SnapshotID("snapshot-id").
Progress(shared.DataMoveOperationProgress{TotalBytes: 1000}).
IncrementalBytes(500).
Phase(velerov2alpha1.DataUploadPhaseCompleted).
SourceNamespace("ns-1").
SourcePVC("pvc-1").

View File

@@ -145,6 +145,12 @@ func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress)
return d
}
// IncrementalBytes sets the DataUpload's IncrementalBytes.
func (d *DataUploadBuilder) IncrementalBytes(incrementalBytes int64) *DataUploadBuilder {
d.object.Status.IncrementalBytes = incrementalBytes
return d
}
// Node sets the DataUpload's Node.
func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder {
d.object.Status.Node = node

View File

@@ -713,6 +713,9 @@ func describeDataMovement(d *Describer, details bool, info *volume.BackupVolumeI
d.Printf("\t\t\t\tData Mover: %s\n", dataMover)
d.Printf("\t\t\t\tUploader Type: %s\n", info.SnapshotDataMovementInfo.UploaderType)
d.Printf("\t\t\t\tMoved data Size (bytes): %d\n", info.SnapshotDataMovementInfo.Size)
if info.SnapshotDataMovementInfo.IncrementalSize > 0 {
d.Printf("\t\t\t\tIncremental data Size (bytes): %d\n", info.SnapshotDataMovementInfo.IncrementalSize)
}
d.Printf("\t\t\t\tResult: %s\n", info.Result)
} else {
d.Printf("\t\t\tData Movement: %s\n", "included, specify --details for more information")
@@ -835,7 +838,7 @@ func describePodVolumeBackups(d *Describer, details bool, podVolumeBackups []vel
backupsByPod := new(volumesByPod)
for _, backup := range backupsByPhase[phase] {
backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume, phase, backup.Status.Progress)
backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume, phase, backup.Status.Progress, backup.Status.IncrementalBytes)
}
d.Printf("\t\t%s:\n", phase)
@@ -885,7 +888,8 @@ type volumesByPod struct {
// Add adds a pod volume with the specified pod namespace, name
// and volume to the appropriate group.
func (v *volumesByPod) Add(namespace, name, volume, phase string, progress veleroapishared.DataMoveOperationProgress) {
// Used for both backup and restore
func (v *volumesByPod) Add(namespace, name, volume, phase string, progress veleroapishared.DataMoveOperationProgress, incrementalBytes int64) {
if v.volumesByPodMap == nil {
v.volumesByPodMap = make(map[string]*podVolumeGroup)
}
@@ -895,6 +899,12 @@ func (v *volumesByPod) Add(namespace, name, volume, phase string, progress veler
// append backup progress percentage if backup is in progress
if phase == "In Progress" && progress.TotalBytes != 0 {
volume = fmt.Sprintf("%s (%.2f%%)", volume, float64(progress.BytesDone)/float64(progress.TotalBytes)*100)
} else if phase == string(velerov1api.PodVolumeBackupPhaseCompleted) && incrementalBytes > 0 {
volume = fmt.Sprintf("%s (size: %v, incremental size: %v)", volume, progress.TotalBytes, incrementalBytes)
} else if (phase == string(velerov1api.PodVolumeBackupPhaseCompleted) ||
phase == string(velerov1api.PodVolumeRestorePhaseCompleted)) &&
progress.TotalBytes > 0 {
volume = fmt.Sprintf("%s (size: %v)", volume, progress.TotalBytes)
}
if group, ok := v.volumesByPodMap[key]; !ok {

View File

@@ -597,11 +597,12 @@ func TestCSISnapshots(t *testing.T) {
Result: volume.VolumeResultFailed,
SnapshotDataMoved: true,
SnapshotDataMovementInfo: &volume.SnapshotDataMovementInfo{
UploaderType: "fake-uploader",
SnapshotHandle: "fake-repo-id-5",
OperationID: "fake-operation-5",
Size: 100,
Phase: velerov2alpha1.DataUploadPhaseFailed,
UploaderType: "fake-uploader",
SnapshotHandle: "fake-repo-id-5",
OperationID: "fake-operation-5",
Size: 100,
IncrementalSize: 50,
Phase: velerov2alpha1.DataUploadPhaseFailed,
},
},
},
@@ -613,6 +614,7 @@ func TestCSISnapshots(t *testing.T) {
Data Mover: velero
Uploader Type: fake-uploader
Moved data Size (bytes): 100
Incremental data Size (bytes): 50
Result: failed
`,
},

View File

@@ -464,6 +464,10 @@ func describeDataMovementInSF(details bool, info *volume.BackupVolumeInfo, snaps
dataMovement["uploaderType"] = info.SnapshotDataMovementInfo.UploaderType
dataMovement["result"] = string(info.Result)
if info.SnapshotDataMovementInfo.Size > 0 || info.SnapshotDataMovementInfo.IncrementalSize > 0 {
dataMovement["size"] = info.SnapshotDataMovementInfo.Size
dataMovement["incrementalSize"] = info.SnapshotDataMovementInfo.IncrementalSize
}
snapshotDetail["dataMovement"] = dataMovement
} else {
@@ -534,7 +538,7 @@ func describePodVolumeBackupsInSF(backups []velerov1api.PodVolumeBackup, details
// group the backups in the current phase by pod (i.e. "ns/name")
backupsByPod := new(volumesByPod)
for _, backup := range backupsByPhase[phase] {
backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume, phase, backup.Status.Progress)
backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume, phase, backup.Status.Progress, backup.Status.IncrementalBytes)
}
backupsByPods := make([]map[string]string, 0)

View File

@@ -408,7 +408,7 @@ func describePodVolumeRestores(d *Describer, restores []velerov1api.PodVolumeRes
restoresByPod := new(volumesByPod)
for _, restore := range restoresByPhase[phase] {
restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume, phase, restore.Status.Progress)
restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume, phase, restore.Status.Progress, 0)
}
d.Printf("\t%s:\n", phase)

View File

@@ -493,6 +493,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
du.Status.Path = result.Backup.Source.ByPath
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
du.Status.SnapshotID = result.Backup.SnapshotID
du.Status.IncrementalBytes = result.Backup.IncrementalBytes
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if result.Backup.EmptySnapshot {
du.Status.Message = "volume was empty so no data was upload"

View File

@@ -526,6 +526,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted
pvb.Status.SnapshotID = result.Backup.SnapshotID
pvb.Status.CompletionTimestamp = &completionTime
pvb.Status.IncrementalBytes = result.Backup.IncrementalBytes
if result.Backup.EmptySnapshot {
pvb.Status.Message = "volume was empty so no snapshot was taken"
}

View File

@@ -156,7 +156,7 @@ func TestOnDataUploadCompleted(t *testing.T) {
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal backup result { false { } 0}: fake-marshal-error",
expectedErr: "Failed to marshal backup result { false { } 0 0}: fake-marshal-error",
},
{
name: "succeed",

View File

@@ -182,7 +182,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
fs.wgDataPath.Done()
}()
snapshotID, emptySnapshot, totalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
snapshotID, emptySnapshot, totalBytes, incrementalBytes, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)
if err == provider.ErrorCanceled {
@@ -194,7 +194,7 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes}})
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source, totalBytes, incrementalBytes}})
}
}()

View File

@@ -96,7 +96,7 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.result.Backup.TotalBytes, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.result.Backup.TotalBytes, test.result.Backup.IncrementalBytes, test.err)
mockProvider.On("Close", mock.Anything).Return(nil)
fs.uploaderProv = mockProvider
fs.initialized = true

View File

@@ -30,10 +30,11 @@ type Result struct {
// BackupResult represents the result of a backup
type BackupResult struct {
SnapshotID string `json:"snapshotID"`
EmptySnapshot bool `json:"emptySnapshot"`
Source AccessPoint `json:"source,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
SnapshotID string `json:"snapshotID"`
EmptySnapshot bool `json:"emptySnapshot"`
Source AccessPoint `json:"source,omitempty"`
TotalBytes int64 `json:"totalBytes,omitempty"`
IncrementalBytes int64 `json:"incrementalBytes,omitempty"`
}
// RestoreResult represents the result of a restore

View File

@@ -155,7 +155,7 @@ func TestOnDataPathCompleted(t *testing.T) {
{
name: "marshal fail",
marshalErr: errors.New("fake-marshal-error"),
expectedErr: "Failed to marshal backup result { false { } 0}: fake-marshal-error",
expectedErr: "Failed to marshal backup result { false { } 0 0}: fake-marshal-error",
},
{
name: "succeed",

View File

@@ -177,3 +177,7 @@ func (p *Progress) EstimationParameters() upload.EstimationParameters {
func (p *Progress) Enabled() bool {
return true
}
func (p *Progress) GetIncrementalSize() int64 {
return p.estimatedTotalBytes - p.cachedBytes
}

View File

@@ -120,13 +120,13 @@ func (kp *kopiaProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, int64, error) {
updater uploader.ProgressUpdater) (string, bool, int64, int64, error) {
if updater == nil {
return "", false, 0, errors.New("Need to initial backup progress updater first")
return "", false, 0, 0, errors.New("Need to initial backup progress updater first")
}
if path == "" {
return "", false, 0, errors.New("path is empty")
return "", false, 0, 0, errors.New("path is empty")
}
log := kp.log.WithFields(logrus.Fields{
@@ -136,7 +136,8 @@ func (kp *kopiaProvider) RunBackup(
})
repoWriter := kopia.NewShimRepo(kp.bkRepo)
kpUploader := upload.NewUploader(repoWriter)
kpUploader.Progress = kopia.NewProgress(updater, backupProgressCheckInterval, log)
progress := kopia.NewProgress(updater, backupProgressCheckInterval, log)
kpUploader.Progress = progress
kpUploader.FailFast = true
quit := make(chan struct{})
log.Info("Starting backup")
@@ -175,9 +176,9 @@ func (kp *kopiaProvider) RunBackup(
if kpUploader.IsCanceled() {
log.Warn("Kopia backup is canceled")
return snapshotID, false, 0, ErrorCanceled
return snapshotID, false, 0, 0, ErrorCanceled
}
return snapshotID, false, 0, errors.Wrapf(err, "Failed to run kopia backup")
return snapshotID, false, 0, 0, errors.Wrapf(err, "Failed to run kopia backup")
}
// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
@@ -189,7 +190,7 @@ func (kp *kopiaProvider) RunBackup(
)
log.Debugf("Kopia backup finished, snapshot ID %s, backup size %d", snapshotInfo.ID, snapshotInfo.Size)
return snapshotInfo.ID, false, snapshotInfo.Size, nil
return snapshotInfo.ID, false, snapshotInfo.Size, progress.GetIncrementalSize(), nil
}
func (kp *kopiaProvider) GetPassword(param any) (string, error) {

View File

@@ -106,7 +106,7 @@ func TestRunBackup(t *testing.T) {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, _, err := kp.RunBackup(t.Context(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
_, _, _, _, err := kp.RunBackup(t.Context(), "var", "", nil, false, "", tc.volMode, map[string]string{}, &updater)
if tc.notError {
assert.NoError(t, err)
} else {

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.39.1. DO NOT EDIT.
// Code generated by mockery v2.53.5. DO NOT EDIT.
package mocks
@@ -34,7 +34,7 @@ func (_m *Provider) Close(ctx context.Context) error {
}
// RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, updater uploader.ProgressUpdater) (string, bool, int64, error) {
func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, updater uploader.ProgressUpdater) (string, bool, int64, int64, error) {
ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
if len(ret) == 0 {
@@ -44,8 +44,9 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin
var r0 string
var r1 bool
var r2 int64
var r3 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (string, bool, int64, error)); ok {
var r3 int64
var r4 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) (string, bool, int64, int64, error)); ok {
return rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) string); ok {
@@ -66,13 +67,19 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin
r2 = ret.Get(2).(int64)
}
if rf, ok := ret.Get(3).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok {
if rf, ok := ret.Get(3).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) int64); ok {
r3 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r3 = ret.Error(3)
r3 = ret.Get(3).(int64)
}
return r0, r1, r2, r3
if rf, ok := ret.Get(4).(func(context.Context, string, string, map[string]string, bool, string, uploader.PersistentVolumeMode, map[string]string, uploader.ProgressUpdater) error); ok {
r4 = rf(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, uploaderCfg, updater)
} else {
r4 = ret.Error(4)
}
return r0, r1, r2, r3, r4
}
// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, volMode, uploaderConfig, updater

View File

@@ -50,7 +50,7 @@ type Provider interface {
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, int64, error)
updater uploader.ProgressUpdater) (string, bool, int64, int64, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party
RunRestore(

View File

@@ -124,21 +124,21 @@ func (rp *resticProvider) RunBackup(
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
uploaderCfg map[string]string,
updater uploader.ProgressUpdater) (string, bool, int64, error) {
updater uploader.ProgressUpdater) (string, bool, int64, int64, error) {
if updater == nil {
return "", false, 0, errors.New("Need to initial backup progress updater first")
return "", false, 0, 0, errors.New("Need to initial backup progress updater first")
}
if path == "" {
return "", false, 0, errors.New("path is empty")
return "", false, 0, 0, errors.New("path is empty")
}
if realSource != "" {
return "", false, 0, errors.New("real source is not empty, this is not supported by restic uploader")
return "", false, 0, 0, errors.New("real source is not empty, this is not supported by restic uploader")
}
if volMode == uploader.PersistentVolumeBlock {
return "", false, 0, errors.New("unable to support block mode")
return "", false, 0, 0, errors.New("unable to support block mode")
}
log := rp.log.WithFields(logrus.Fields{
@@ -149,7 +149,7 @@ func (rp *resticProvider) RunBackup(
if len(uploaderCfg) > 0 {
parallelFilesUpload, err := uploaderutil.GetParallelFilesUpload(uploaderCfg)
if err != nil {
return "", false, 0, errors.Wrap(err, "failed to get uploader config")
return "", false, 0, 0, errors.Wrap(err, "failed to get uploader config")
}
if parallelFilesUpload > 0 {
log.Warnf("ParallelFilesUpload is set to %d, but restic does not support parallel file uploads. Ignoring.", parallelFilesUpload)
@@ -171,9 +171,9 @@ func (rp *resticProvider) RunBackup(
if err != nil {
if strings.Contains(stderrBuf, "snapshot is empty") {
log.Debugf("Restic backup got empty dir with %s path", path)
return "", true, 0, nil
return "", true, 0, 0, nil
}
return "", false, 0, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
return "", false, 0, 0, errors.WithStack(fmt.Errorf("error running restic backup command %s with error: %v stderr: %v", backupCmd.String(), err, stderrBuf))
}
// GetSnapshotID
snapshotIDCmd := resticGetSnapshotFunc(rp.repoIdentifier, rp.credentialsFile, tags)
@@ -184,10 +184,10 @@ func (rp *resticProvider) RunBackup(
}
snapshotID, err := resticGetSnapshotIDFunc(snapshotIDCmd)
if err != nil {
return "", false, 0, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
return "", false, 0, 0, errors.WithStack(fmt.Errorf("error getting snapshot id with error: %v", err))
}
log.Infof("Run command=%s, stdout=%s, stderr=%s", backupCmd.String(), summary, stderrBuf)
return snapshotID, false, 0, nil
return snapshotID, false, 0, 0, nil
}
// RunRestore runs a `restore` command and monitors the volume size to

View File

@@ -149,9 +149,9 @@ func TestResticRunBackup(t *testing.T) {
}
if !tc.nilUpdater {
updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: t.Context(), Cli: fake.NewClientBuilder().WithScheme(util.VeleroScheme).Build()}
_, _, _, err = tc.rp.RunBackup(t.Context(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, &updater)
_, _, _, _, err = tc.rp.RunBackup(t.Context(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, &updater)
} else {
_, _, _, err = tc.rp.RunBackup(t.Context(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, nil)
_, _, _, _, err = tc.rp.RunBackup(t.Context(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, map[string]string{}, nil)
}
tc.rp.log.Infof("test name %v error %v", tc.name, err)

View File

@@ -116,12 +116,17 @@ velero backup create NAME --snapshot-move-data --data-mover DATA-MOVER-NAME OPTI
When the backup starts, you will see the `VolumeSnapshot` and `VolumeSnapshotContent` objects created, but after the backup finishes, the objects will disappear.
After snapshots are created, you will see one or more `DataUpload` CRs created.
You may also see some intermediate objects (i.e., pods, PVCs, PVs) created in Velero namespace or the cluster scope, they are to help data movers to move data. And they will be removed after the backup completes.
The phase of a `DataUpload` CR changes several times during the backup process and finally goes to one of the terminal status, `Completed`, `Failed` or `Cancelled`. You can see the phase changes as well as the data upload progress by watching the `DataUpload` CRs:
The phase of a `DataUpload` CR changes several times during the backup process and finally goes to one of the terminal status, `Completed`, `Failed` or `Cancelled`. You can see the phase changes as well as the data upload progress by watching the `DataUpload` CRs. While the `DataUpload` is being processed, progress is shown with `BYTES DONE` representing the amount of volume data that has been processed so far and `TOTAL BYTES` representing the estimated total volume data. Upon completion, these two numbers will be the same. In addition, once the `DataUpload` is done, `INCREMENTAL BYTES` will be filled in with the amount of data which is new or changed since the last backup of this volume. Note that the actual uploaded content may be smaller than `INCREMENTAL BYTES` due to kopia deduplication, compression, etc.
```bash
kubectl -n velero get datauploads -l velero.io/backup-name=YOUR_BACKUP_NAME -w
```
By default, `INCREMENTAL BYTES` is not displayed in the `kubectl get` output. To see this extended field, the `-o wide` arg is needed:
```bash
kubectl -n velero get datauploads -o wide -l velero.io/backup-name=YOUR_BACKUP_NAME -w
```
When the backup completes, you can view information about the backups:
```bash