From 1e182e5837c91df2fb58a519491578828434fff6 Mon Sep 17 00:00:00 2001 From: Adnan Abdulhussein Date: Tue, 10 Sep 2019 11:28:19 -0700 Subject: [PATCH] record restic backup progress in PodVolumeBackup (#1821) * record restic backup progress in PodVolumeBackup Signed-off-by: Adnan Abdulhussein --- Dockerfile-velero | 6 +- Dockerfile-velero-ppc64le | 4 +- changelogs/unreleased/1821-prydonius | 1 + pkg/apis/velero/v1/pod_volume_backup.go | 5 + .../v1/pod_volume_operation_progress.go | 24 ++++ pkg/apis/velero/v1/zz_generated.deepcopy.go | 17 +++ pkg/cmd/util/output/backup_describer.go | 9 +- pkg/cmd/util/output/restore_describer.go | 3 +- .../pod_volume_backup_controller.go | 15 ++- pkg/restic/command_factory.go | 2 +- pkg/restic/command_factory_test.go | 2 +- pkg/restic/exec_commands.go | 118 ++++++++++++++++++ pkg/restic/exec_commands_test.go | 80 ++++++++++++ 13 files changed, 274 insertions(+), 12 deletions(-) create mode 100644 changelogs/unreleased/1821-prydonius create mode 100644 pkg/apis/velero/v1/pod_volume_operation_progress.go create mode 100644 pkg/restic/exec_commands_test.go diff --git a/Dockerfile-velero b/Dockerfile-velero index 671ca56f2..3c9d0bf60 100644 --- a/Dockerfile-velero +++ b/Dockerfile-velero @@ -18,9 +18,9 @@ LABEL maintainer="Steve Kriss " RUN apt-get update && \ apt-get install -y --no-install-recommends ca-certificates wget bzip2 && \ - wget --quiet https://github.com/restic/restic/releases/download/v0.9.4/restic_0.9.4_linux_amd64.bz2 && \ - bunzip2 restic_0.9.4_linux_amd64.bz2 && \ - mv restic_0.9.4_linux_amd64 /usr/bin/restic && \ + wget --quiet https://github.com/restic/restic/releases/download/v0.9.5/restic_0.9.5_linux_amd64.bz2 && \ + bunzip2 restic_0.9.5_linux_amd64.bz2 && \ + mv restic_0.9.5_linux_amd64 /usr/bin/restic && \ chmod +x /usr/bin/restic && \ apt-get remove -y wget bzip2 && \ rm -rf /var/lib/apt/lists/* diff --git a/Dockerfile-velero-ppc64le b/Dockerfile-velero-ppc64le index e3bf18851..4a2ab0d58 100644 --- a/Dockerfile-velero-ppc64le +++ b/Dockerfile-velero-ppc64le @@ -18,8 +18,8 @@ LABEL maintainer="Steve Kriss " RUN apt-get update && \ apt-get install -y --no-install-recommends ca-certificates wget && \ - wget --quiet https://oplab9.parqtec.unicamp.br/pub/ppc64el/restic/restic-0.9.4 && \ - mv restic-0.9.4 /usr/bin/restic && \ + wget --quiet https://oplab9.parqtec.unicamp.br/pub/ppc64el/restic/restic-0.9.5 && \ + mv restic-0.9.5 /usr/bin/restic && \ chmod +x /usr/bin/restic && \ apt-get remove -y wget && \ rm -rf /var/lib/apt/lists/* diff --git a/changelogs/unreleased/1821-prydonius b/changelogs/unreleased/1821-prydonius new file mode 100644 index 000000000..f613620a8 --- /dev/null +++ b/changelogs/unreleased/1821-prydonius @@ -0,0 +1 @@ +report backup progress in PodVolumeBackups and expose progress in the velero backup describe --details command. Also upgrades restic to v0.9.5 diff --git a/pkg/apis/velero/v1/pod_volume_backup.go b/pkg/apis/velero/v1/pod_volume_backup.go index 330fda00e..40cbd761d 100644 --- a/pkg/apis/velero/v1/pod_volume_backup.go +++ b/pkg/apis/velero/v1/pod_volume_backup.go @@ -80,6 +80,11 @@ type PodVolumeBackupStatus struct { // Completion time is recorded before uploading the backup object. // The server's time is used for CompletionTimestamps CompletionTimestamp metav1.Time `json:"completionTimestamp"` + + // Progress holds the total number of bytes of the volume and the current + // number of backed up bytes. This can be used to display progress information + // about the backup operation. + Progress PodVolumeOperationProgress `json:"progress"` } // +genclient diff --git a/pkg/apis/velero/v1/pod_volume_operation_progress.go b/pkg/apis/velero/v1/pod_volume_operation_progress.go new file mode 100644 index 000000000..4cde13dc3 --- /dev/null +++ b/pkg/apis/velero/v1/pod_volume_operation_progress.go @@ -0,0 +1,24 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +// PodVolumeOperationProgress represents the progress of a +// PodVolumeBackup/Restore (restic) operation +type PodVolumeOperationProgress struct { + TotalBytes int64 `json:"totalBytes"` + BytesDone int64 `json:"bytesDone"` +} diff --git a/pkg/apis/velero/v1/zz_generated.deepcopy.go b/pkg/apis/velero/v1/zz_generated.deepcopy.go index ef51072cc..0cc4aaa2d 100644 --- a/pkg/apis/velero/v1/zz_generated.deepcopy.go +++ b/pkg/apis/velero/v1/zz_generated.deepcopy.go @@ -722,6 +722,7 @@ func (in *PodVolumeBackupStatus) DeepCopyInto(out *PodVolumeBackupStatus) { *out = *in in.StartTimestamp.DeepCopyInto(&out.StartTimestamp) in.CompletionTimestamp.DeepCopyInto(&out.CompletionTimestamp) + out.Progress = in.Progress return } @@ -735,6 +736,22 @@ func (in *PodVolumeBackupStatus) DeepCopy() *PodVolumeBackupStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodVolumeOperationProgress) DeepCopyInto(out *PodVolumeOperationProgress) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodVolumeOperationProgress. +func (in *PodVolumeOperationProgress) DeepCopy() *PodVolumeOperationProgress { + if in == nil { + return nil + } + out := new(PodVolumeOperationProgress) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodVolumeRestore) DeepCopyInto(out *PodVolumeRestore) { *out = *in diff --git a/pkg/cmd/util/output/backup_describer.go b/pkg/cmd/util/output/backup_describer.go index c1fd8b1bf..20d48ad8e 100644 --- a/pkg/cmd/util/output/backup_describer.go +++ b/pkg/cmd/util/output/backup_describer.go @@ -377,7 +377,7 @@ func DescribePodVolumeBackups(d *Describer, backups []velerov1api.PodVolumeBacku backupsByPod := new(volumesByPod) for _, backup := range backupsByPhase[phase] { - backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume) + backupsByPod.Add(backup.Spec.Pod.Namespace, backup.Spec.Pod.Name, backup.Spec.Volume, phase, backup.Status.Progress) } d.Printf("\t%s:\n", phase) @@ -423,13 +423,18 @@ type volumesByPod struct { // Add adds a pod volume with the specified pod namespace, name // and volume to the appropriate group. -func (v *volumesByPod) Add(namespace, name, volume string) { +func (v *volumesByPod) Add(namespace, name, volume, phase string, progress velerov1api.PodVolumeOperationProgress) { if v.volumesByPodMap == nil { v.volumesByPodMap = make(map[string]*podVolumeGroup) } key := fmt.Sprintf("%s/%s", namespace, name) + // append backup progress percentage if backup is in progress + if phase == "In Progress" && progress != (velerov1api.PodVolumeOperationProgress{}) { + volume = fmt.Sprintf("%s (%.2f%%)", volume, float64(progress.BytesDone)/float64(progress.TotalBytes)*100) + } + if group, ok := v.volumesByPodMap[key]; !ok { group := &podVolumeGroup{ label: key, diff --git a/pkg/cmd/util/output/restore_describer.go b/pkg/cmd/util/output/restore_describer.go index dada2aca4..d81ab951b 100644 --- a/pkg/cmd/util/output/restore_describer.go +++ b/pkg/cmd/util/output/restore_describer.go @@ -188,7 +188,8 @@ func describePodVolumeRestores(d *Describer, restores []v1.PodVolumeRestore, det restoresByPod := new(volumesByPod) for _, restore := range restoresByPhase[phase] { - restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume) + // TODO(adnan): replace last parameter with progress from status (#1749) + restoresByPod.Add(restore.Spec.Pod.Namespace, restore.Spec.Pod.Name, restore.Spec.Volume, phase, v1.PodVolumeOperationProgress{}) } d.Printf("\t%s:\n", phase) diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 9b3e8cbcb..d83ee9fa2 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -39,7 +39,6 @@ import ( informers "github.com/heptio/velero/pkg/generated/informers/externalversions/velero/v1" listers "github.com/heptio/velero/pkg/generated/listers/velero/v1" "github.com/heptio/velero/pkg/restic" - veleroexec "github.com/heptio/velero/pkg/util/exec" "github.com/heptio/velero/pkg/util/filesystem" "github.com/heptio/velero/pkg/util/kube" ) @@ -254,7 +253,7 @@ func (c *podVolumeBackupController) processBackup(req *velerov1api.PodVolumeBack var stdout, stderr string var emptySnapshot bool - if stdout, stderr, err = veleroexec.RunCommand(resticCmd.Cmd()); err != nil { + if stdout, stderr, err = restic.RunBackup(resticCmd, log, c.updateBackupProgressFunc(req, log)); err != nil { if strings.Contains(stderr, "snapshot is empty") { emptySnapshot = true } else { @@ -361,6 +360,18 @@ func (c *podVolumeBackupController) patchPodVolumeBackup(req *velerov1api.PodVol return req, nil } +// updateBackupProgressFunc returns a func that takes progress info and patches +// the PVB with the new progress +func (c *podVolumeBackupController) updateBackupProgressFunc(req *velerov1api.PodVolumeBackup, log logrus.FieldLogger) func(velerov1api.PodVolumeOperationProgress) { + return func(progress velerov1api.PodVolumeOperationProgress) { + if _, err := c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) { + r.Status.Progress = progress + }); err != nil { + log.WithError(err).Error("error updating PodVolumeBackup progress") + } + } +} + func (c *podVolumeBackupController) fail(req *velerov1api.PodVolumeBackup, msg string, log logrus.FieldLogger) error { if _, err := c.patchPodVolumeBackup(req, func(r *velerov1api.PodVolumeBackup) { r.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed diff --git a/pkg/restic/command_factory.go b/pkg/restic/command_factory.go index 95a2d6009..6217fba67 100644 --- a/pkg/restic/command_factory.go +++ b/pkg/restic/command_factory.go @@ -34,7 +34,7 @@ func BackupCommand(repoIdentifier, passwordFile, path string, tags map[string]st PasswordFile: passwordFile, Dir: path, Args: []string{"."}, - ExtraFlags: append(backupTagFlags(tags), "--host=velero"), + ExtraFlags: append(backupTagFlags(tags), "--host=velero", "--json"), } } diff --git a/pkg/restic/command_factory_test.go b/pkg/restic/command_factory_test.go index 6cd0ab961..d4236198f 100644 --- a/pkg/restic/command_factory_test.go +++ b/pkg/restic/command_factory_test.go @@ -32,7 +32,7 @@ func TestBackupCommand(t *testing.T) { assert.Equal(t, "path", c.Dir) assert.Equal(t, []string{"."}, c.Args) - expected := []string{"--tag=foo=bar", "--tag=c=d", "--host=velero"} + expected := []string{"--tag=foo=bar", "--tag=c=d", "--host=velero", "--json"} sort.Strings(expected) sort.Strings(c.ExtraFlags) assert.Equal(t, expected, c.ExtraFlags) diff --git a/pkg/restic/exec_commands.go b/pkg/restic/exec_commands.go index c75a45130..b021774d7 100644 --- a/pkg/restic/exec_commands.go +++ b/pkg/restic/exec_commands.go @@ -17,13 +17,29 @@ limitations under the License. package restic import ( + "bytes" "encoding/json" + "fmt" + "time" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + velerov1api "github.com/heptio/velero/pkg/apis/velero/v1" "github.com/heptio/velero/pkg/util/exec" ) +const backupProgressCheckInterval = 10 * time.Second + +type backupStatusLine struct { + MessageType string `json:"message_type"` + // seen in status lines + TotalBytes int64 `json:"total_bytes"` + BytesDone int64 `json:"bytes_done"` + // seen in summary line at the end + TotalBytesProcessed int64 `json:"total_bytes_processed"` +} + // GetSnapshotID runs a 'restic snapshots' command to get the ID of the snapshot // in the specified repo matching the set of provided tags, or an error if a // unique snapshot cannot be identified. @@ -53,3 +69,105 @@ func GetSnapshotID(repoIdentifier, passwordFile string, tags map[string]string, return snapshots[0].ShortID, nil } + +// RunBackup runs a `restic backup` command and watches the output to provide +// progress updates to the caller. +func RunBackup(backupCmd *Command, log logrus.FieldLogger, updateFunc func(velerov1api.PodVolumeOperationProgress)) (string, string, error) { + // buffers for copying command stdout/err output into + stdoutBuf := new(bytes.Buffer) + stderrBuf := new(bytes.Buffer) + + // create a channel to signal when to end the goroutine scanning for progress + // updates + quit := make(chan struct{}) + + cmd := backupCmd.Cmd() + cmd.Stdout = stdoutBuf + cmd.Stderr = stderrBuf + + cmd.Start() + + go func() { + ticker := time.NewTicker(backupProgressCheckInterval) + for { + select { + case <-ticker.C: + lastLine := getLastLine(stdoutBuf.Bytes()) + stat, err := decodeBackupStatusLine(lastLine) + if err != nil { + log.WithError(err).Errorf("error getting restic backup progress") + } + + // if the line contains a non-empty bytes_done field, we can update the + // caller with the progress + if stat.BytesDone != 0 { + updateFunc(velerov1api.PodVolumeOperationProgress{ + TotalBytes: stat.TotalBytes, + BytesDone: stat.BytesDone, + }) + } + case <-quit: + ticker.Stop() + return + } + } + }() + + cmd.Wait() + quit <- struct{}{} + + summary, err := getSummaryLine(stdoutBuf.Bytes()) + if err != nil { + return stdoutBuf.String(), stderrBuf.String(), err + } + stat, err := decodeBackupStatusLine(summary) + if err != nil { + return stdoutBuf.String(), stderrBuf.String(), err + } + if stat.MessageType != "summary" { + return stdoutBuf.String(), stderrBuf.String(), errors.WithStack(fmt.Errorf("error getting restic backup summary: %s", string(summary))) + } + + // update progress to 100% + updateFunc(velerov1api.PodVolumeOperationProgress{ + TotalBytes: stat.TotalBytesProcessed, + BytesDone: stat.TotalBytesProcessed, + }) + + return string(summary), stderrBuf.String(), nil +} + +func decodeBackupStatusLine(lastLine []byte) (backupStatusLine, error) { + var stat backupStatusLine + if err := json.Unmarshal(lastLine, &stat); err != nil { + return stat, errors.Wrapf(err, "unable to decode backup JSON line: %s", string(lastLine)) + } + return stat, nil +} + +// getLastLine returns the last line of a byte array. The string is assumed to +// have a newline at the end of it, so this returns the substring between the +// last two newlines. +func getLastLine(b []byte) []byte { + // subslice the byte array to ignore the newline at the end of the string + lastNewLineIdx := bytes.LastIndex(b[:len(b)-1], []byte("\n")) + return b[lastNewLineIdx+1 : len(b)-1] +} + +// getSummaryLine looks for the summary JSON line +// (`{"message_type:"summary",...`) in the restic backup command output. Due to +// an issue in Restic, this might not always be the last line +// (https://github.com/restic/restic/issues/2389). It returns an error if it +// can't be found. +func getSummaryLine(b []byte) ([]byte, error) { + summaryLineIdx := bytes.LastIndex(b, []byte(`{"message_type":"summary"`)) + if summaryLineIdx < 0 { + return nil, errors.New("unable to find summary in restic backup command output") + } + // find the end of the summary line + newLineIdx := bytes.Index(b[summaryLineIdx:], []byte("\n")) + if newLineIdx < 0 { + return nil, errors.New("unable to get summary line from restic backup command output") + } + return b[summaryLineIdx : summaryLineIdx+newLineIdx], nil +} diff --git a/pkg/restic/exec_commands_test.go b/pkg/restic/exec_commands_test.go new file mode 100644 index 000000000..ee0257eae --- /dev/null +++ b/pkg/restic/exec_commands_test.go @@ -0,0 +1,80 @@ +/* +Copyright 2019 the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restic + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_getSummaryLine(t *testing.T) { + summaryLine := `{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0,"dirs_changed":0,"dirs_unmodified":0,"data_blobs":0,"tree_blobs":0,"data_added":0,"total_files_processed":3,"total_bytes_processed":13238272000,"total_duration":0.319265105,"snapshot_id":"38515bb5"}` + tests := []struct { + name string + output string + wantErr bool + }{ + {"no summary", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000} +{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000} +`, true}, + {"no newline after summary", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000} +{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000} +{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0`, true}, + {"summary at end", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000} +{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000} +{"message_type":"status","percent_done":1,"total_files":3,"files_done":3,"total_bytes":13238272000,"bytes_done":13238272000} +{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0,"dirs_changed":0,"dirs_unmodified":0,"data_blobs":0,"tree_blobs":0,"data_added":0,"total_files_processed":3,"total_bytes_processed":13238272000,"total_duration":0.319265105,"snapshot_id":"38515bb5"} +`, false}, + {"summary before status", `{"message_type":"status","percent_done":0,"total_files":1,"total_bytes":10485760000} +{"message_type":"status","percent_done":0,"total_files":3,"files_done":1,"total_bytes":13238272000} +{"message_type":"summary","files_new":0,"files_changed":0,"files_unmodified":3,"dirs_new":0,"dirs_changed":0,"dirs_unmodified":0,"data_blobs":0,"tree_blobs":0,"data_added":0,"total_files_processed":3,"total_bytes_processed":13238272000,"total_duration":0.319265105,"snapshot_id":"38515bb5"} +{"message_type":"status","percent_done":1,"total_files":3,"files_done":3,"total_bytes":13238272000,"bytes_done":13238272000} +`, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + summary, err := getSummaryLine([]byte(tt.output)) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, summaryLine, string(summary)) + } + }) + } +} + +func Test_getLastLine(t *testing.T) { + tests := []struct { + output string + want string + }{ + {`last line +`, "last line"}, + {`first line +second line +third line +`, "third line"}, + } + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + assert.Equal(t, []byte(tt.want), getLastLine([]byte(tt.output))) + }) + } +}