add progress info to backup status

Signed-off-by: Steve Kriss <krisss@vmware.com>
This commit is contained in:
Steve Kriss
2020-04-22 12:14:09 -06:00
parent 30ca0e4322
commit 4b0f654a1e
8 changed files with 155 additions and 2 deletions

View File

@@ -273,6 +273,26 @@ type BackupStatus struct {
// file in object storage.
// +optional
Errors int `json:"errors,omitempty"`
// Progress contains information about the backup's execution progress. Note
// that this information is best-effort only -- if Velero fails to update it
// during a backup for any reason, it may be inaccurate/stale.
Progress *BackupProgress `json:"progress,omitempty"`
}
// BackupProgress stores information about the progress of a Backup's execution.
type BackupProgress struct {
// TotalItems is the total number of items to be backed up. This number may change
// throughout the execution of the backup due to plugins that return additional related
// items to back up, the velero.io/exclude-from-backup label, and various other
// filters that happen as items are processed.
// +optional
TotalItems int `json:"totalItems,omitempty"`
// ItemsBackedUp is the number of items that have actually been written to the
// backup tarball so far.
// +optional
ItemsBackedUp int `json:"itemsBackedUp,omitempty"`
}
// +genclient

View File

@@ -109,6 +109,22 @@ func (in *BackupList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BackupProgress) DeepCopyInto(out *BackupProgress) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupProgress.
func (in *BackupProgress) DeepCopy() *BackupProgress {
if in == nil {
return nil
}
out := new(BackupProgress)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BackupResourceHook) DeepCopyInto(out *BackupResourceHook) {
*out = *in
@@ -263,6 +279,11 @@ func (in *BackupStatus) DeepCopyInto(out *BackupStatus) {
in, out := &in.CompletionTimestamp, &out.CompletionTimestamp
*out = (*in).DeepCopy()
}
if in.Progress != nil {
in, out := &in.Progress, &out.Progress
*out = new(BackupProgress)
**out = **in
}
return
}

View File

@@ -35,11 +35,13 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
"github.com/vmware-tanzu/velero/pkg/podexec"
@@ -63,6 +65,7 @@ type Backupper interface {
// kubernetesBackupper implements Backupper.
type kubernetesBackupper struct {
backupClient velerov1client.BackupsGetter
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
podCommandExecutor podexec.PodCommandExecutor
@@ -94,6 +97,7 @@ func cohabitatingResources() map[string]*cohabitatingResource {
// NewKubernetesBackupper creates a new kubernetesBackupper.
func NewKubernetesBackupper(
backupClient velerov1client.BackupsGetter,
discoveryHelper discovery.Helper,
dynamicFactory client.DynamicFactory,
podCommandExecutor podexec.PodCommandExecutor,
@@ -101,6 +105,7 @@ func NewKubernetesBackupper(
resticTimeout time.Duration,
) (Backupper, error) {
return &kubernetesBackupper{
backupClient: backupClient,
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
podCommandExecutor: podCommandExecutor,
@@ -289,6 +294,11 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
items := collector.getAllItems()
log.WithField("progress", "").Infof("Collected %d items matching the backup spec from the Kubernetes API (actual number of items backed up may be more or less depending on velero.io/exclude-from-backup annotation, plugins returning additional related items to back up, etc.)", len(items))
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d}}}`, len(items))
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(backupRequest.Name, types.MergePatchType, []byte(patch)); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress.totalItems")
}
itemBackupper := &itemBackupper{
backupRequest: backupRequest,
tarWriter: tw,
@@ -302,7 +312,51 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
},
}
// helper struct to send current progress between the main
// backup loop and the gouroutine that periodically patches
// the backup CR with progress updates
type progressUpdate struct {
totalItems, itemsBackedUp int
}
// the main backup process will send on this channel once
// for every item it processes.
update := make(chan progressUpdate)
// the main backup process will send on this channel when
// it's done sending progress updates
quit := make(chan struct{})
// This is the progress updater goroutine that receives
// progress updates on the 'update' channel. It patches
// the backup CR with progress updates at most every second,
// but it will not issue a patch if it hasn't received a new
// update since the previous patch. This goroutine exits
// when it receives on the 'quit' channel.
go func() {
ticker := time.NewTicker(1 * time.Second)
var lastUpdate *progressUpdate
for {
select {
case <-quit:
ticker.Stop()
return
case val := <-update:
lastUpdate = &val
case <-ticker.C:
if lastUpdate != nil {
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, lastUpdate.totalItems, lastUpdate.itemsBackedUp)
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(backupRequest.Name, types.MergePatchType, []byte(patch)); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
lastUpdate = nil
}
}
}
}()
backedUpGroupResources := map[schema.GroupResource]bool{}
totalItems := len(items)
for i, item := range items {
log.WithFields(map[string]interface{}{
@@ -310,7 +364,7 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Processing item %d of %d", i+1, len(items))
}).Infof("Processing item")
// use an anonymous func so we can defer-close/remove the file
// as soon as we're done with it
@@ -334,8 +388,28 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
backedUpGroupResources[item.groupResource] = true
}
}()
// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems = len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: len(backupRequest.BackedUpItems),
}
log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
}
// no more progress updates will be sent on the 'update' channel
quit <- struct{}{}
// back up CRD for resource if found. We should only need to do this if we've backed up at least
// one item for the resource and IncludeClusterResources is nil. If IncludeClusterResources is false
// we don't want to back it up, and if it's true it will already be included.
@@ -345,6 +419,13 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
}
}
// do a final update on progress since we may have just added some CRDs and may not have updated
// for the last few processed items.
patch = fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, len(backupRequest.BackedUpItems), len(backupRequest.BackedUpItems))
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(backupRequest.Name, types.MergePatchType, []byte(patch)); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
return nil

View File

@@ -2690,6 +2690,7 @@ func newHarness(t *testing.T) *harness {
return &harness{
APIServer: apiServer,
backupper: &kubernetesBackupper{
backupClient: apiServer.VeleroClient.VeleroV1(),
dynamicFactory: client.NewDynamicFactory(apiServer.DynamicClient),
discoveryHelper: discoveryHelper,

View File

@@ -630,6 +630,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupControllerRunInfo := func() controllerRunInfo {
backupper, err := backup.NewKubernetesBackupper(
s.veleroClient.VeleroV1(),
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),

View File

@@ -238,6 +238,18 @@ func DescribeBackupStatus(d *Describer, backup *velerov1api.Backup, details bool
d.Printf("Expiration:\t%s\n", status.Expiration)
d.Println()
if backup.Status.Progress != nil {
if backup.Status.Phase == velerov1api.BackupPhaseInProgress {
d.Printf("Estimated total items to be backed up:\t%d\n", backup.Status.Progress.TotalItems)
d.Printf("Items backed up so far:\t%d\n", backup.Status.Progress.ItemsBackedUp)
} else {
d.Printf("Total items to be backed up:\t%d\n", backup.Status.Progress.TotalItems)
d.Printf("Items backed up:\t%d\n", backup.Status.Progress.ItemsBackedUp)
}
d.Println()
}
if details {
describeBackupResourceList(d, backup, veleroClient, insecureSkipTLSVerify, caCertPath)
d.Println()

File diff suppressed because one or more lines are too long

View File

@@ -355,6 +355,23 @@ spec:
- Failed
- Deleting
type: string
progress:
description: Progress contains information about the backup's execution
progress. Note that this information is best-effort only -- if Velero
fails to update it during a backup for any reason, it may be inaccurate/stale.
properties:
itemsBackedUp:
description: ItemsBackedUp is the number of items that have actually
been written to the backup tarball so far.
type: integer
totalItems:
description: TotalItems is the total number of items to be backed
up. This number may change throughout the execution of the backup
due to plugins that return additional related items to back up,
the velero.io/exclude-from-backup label, and various other filters
that happen as items are processed.
type: integer
type: object
startTimestamp:
description: StartTimestamp records the time a backup was started. Separate
from CreationTimestamp, since that value changes on restores. The