Merge pull request #2440 from skriss/backup-progress

report backup progress
This commit is contained in:
Nolan Brubaker
2020-05-07 14:51:20 -04:00
committed by GitHub
13 changed files with 678 additions and 651 deletions

View File

@@ -273,6 +273,26 @@ type BackupStatus struct {
// file in object storage.
// +optional
Errors int `json:"errors,omitempty"`
// Progress contains information about the backup's execution progress. Note
// that this information is best-effort only -- if Velero fails to update it
// during a backup for any reason, it may be inaccurate/stale.
Progress *BackupProgress `json:"progress,omitempty"`
}
// BackupProgress stores information about the progress of a Backup's execution.
type BackupProgress struct {
// TotalItems is the total number of items to be backed up. This number may change
// throughout the execution of the backup due to plugins that return additional related
// items to back up, the velero.io/exclude-from-backup label, and various other
// filters that happen as items are processed.
// +optional
TotalItems int `json:"totalItems,omitempty"`
// ItemsBackedUp is the number of items that have actually been written to the
// backup tarball so far.
// +optional
ItemsBackedUp int `json:"itemsBackedUp,omitempty"`
}
// +genclient

View File

@@ -109,6 +109,22 @@ func (in *BackupList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BackupProgress) DeepCopyInto(out *BackupProgress) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackupProgress.
func (in *BackupProgress) DeepCopy() *BackupProgress {
if in == nil {
return nil
}
out := new(BackupProgress)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BackupResourceHook) DeepCopyInto(out *BackupResourceHook) {
*out = *in
@@ -263,6 +279,11 @@ func (in *BackupStatus) DeepCopyInto(out *BackupStatus) {
in, out := &in.CompletionTimestamp, &out.CompletionTimestamp
*out = (*in).DeepCopy()
}
if in.Progress != nil {
in, out := &in.Progress, &out.Progress
*out = new(BackupProgress)
**out = **in
}
return
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2017 the Velero contributors.
Copyright 2017, 2020 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -20,20 +20,29 @@ import (
"archive/tar"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/restic"
@@ -49,17 +58,17 @@ const BackupFormatVersion = "1.1.0"
// Backupper performs backups.
type Backupper interface {
// Backup takes a backup using the specification in the api.Backup and writes backup and log data
// Backup takes a backup using the specification in the velerov1api.Backup and writes backup and log data
// to the given writers.
Backup(logger logrus.FieldLogger, backup *Request, backupFile io.Writer, actions []velero.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error
}
// kubernetesBackupper implements Backupper.
type kubernetesBackupper struct {
backupClient velerov1client.BackupsGetter
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
podCommandExecutor podexec.PodCommandExecutor
groupBackupperFactory groupBackupperFactory
resticBackupperFactory restic.BackupperFactory
resticTimeout time.Duration
}
@@ -88,6 +97,7 @@ func cohabitatingResources() map[string]*cohabitatingResource {
// NewKubernetesBackupper creates a new kubernetesBackupper.
func NewKubernetesBackupper(
backupClient velerov1client.BackupsGetter,
discoveryHelper discovery.Helper,
dynamicFactory client.DynamicFactory,
podCommandExecutor podexec.PodCommandExecutor,
@@ -95,10 +105,10 @@ func NewKubernetesBackupper(
resticTimeout time.Duration,
) (Backupper, error) {
return &kubernetesBackupper{
backupClient: backupClient,
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
podCommandExecutor: podCommandExecutor,
groupBackupperFactory: &defaultGroupBackupperFactory{},
resticBackupperFactory: resticBackupperFactory,
resticTimeout: resticTimeout,
}, nil
@@ -162,11 +172,11 @@ func getResourceIncludesExcludes(helper discovery.Helper, includes, excludes []s
// getNamespaceIncludesExcludes returns an IncludesExcludes list containing which namespaces to
// include and exclude from the backup.
func getNamespaceIncludesExcludes(backup *api.Backup) *collections.IncludesExcludes {
func getNamespaceIncludesExcludes(backup *velerov1api.Backup) *collections.IncludesExcludes {
return collections.NewIncludesExcludes().Includes(backup.Spec.IncludedNamespaces...).Excludes(backup.Spec.ExcludedNamespaces...)
}
func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper discovery.Helper) ([]resourceHook, error) {
func getResourceHooks(hookSpecs []velerov1api.BackupResourceHookSpec, discoveryHelper discovery.Helper) ([]resourceHook, error) {
resourceHooks := make([]resourceHook, 0, len(hookSpecs))
for _, s := range hookSpecs {
@@ -181,7 +191,7 @@ func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper di
return resourceHooks, nil
}
func getResourceHook(hookSpec api.BackupResourceHookSpec, discoveryHelper discovery.Helper) (resourceHook, error) {
func getResourceHook(hookSpec velerov1api.BackupResourceHookSpec, discoveryHelper discovery.Helper) (resourceHook, error) {
h := resourceHook{
name: hookSpec.Name,
namespaces: collections.NewIncludesExcludes().Includes(hookSpec.IncludedNamespaces...).Excludes(hookSpec.ExcludedNamespaces...),
@@ -206,7 +216,7 @@ type VolumeSnapshotterGetter interface {
}
// Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file
// written to backupFile. The finalized api.Backup is written to metadata. Any error that represents
// written to backupFile. The finalized velerov1api.Backup is written to metadata. Any error that represents
// a complete backup failure is returned. Errors that constitute partial failures (i.e. failures to
// back up individual resources that don't prevent the backup from continuing to be processed) are logged
// to the backup log.
@@ -244,7 +254,7 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
backupRequest.BackedUpItems = map[itemKey]struct{}{}
podVolumeTimeout := kb.resticTimeout
if val := backupRequest.Annotations[api.PodVolumeOperationTimeoutAnnotation]; val != "" {
if val := backupRequest.Annotations[velerov1api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Unable to parse pod volume timeout annotation %s, using server value.", val)
@@ -264,30 +274,222 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
}
}
gb := kb.groupBackupperFactory.newGroupBackupper(
log,
backupRequest,
kb.dynamicFactory,
kb.discoveryHelper,
cohabitatingResources(),
kb.podCommandExecutor,
tw,
resticBackupper,
newPVCSnapshotTracker(),
volumeSnapshotterGetter,
)
// set up a temp dir for the itemCollector to use to temporarily
// store items as they're scraped from the API.
tempDir, err := ioutil.TempDir("", "")
if err != nil {
return errors.Wrap(err, "error creating temp dir for backup")
}
defer os.RemoveAll(tempDir)
for _, group := range kb.discoveryHelper.Resources() {
if err := gb.backupGroup(group); err != nil {
log.WithError(err).WithField("apiGroup", group.String()).Error("Error backing up API group")
collector := &itemCollector{
log: log,
backupRequest: backupRequest,
discoveryHelper: kb.discoveryHelper,
dynamicFactory: kb.dynamicFactory,
cohabitatingResources: cohabitatingResources(),
dir: tempDir,
}
items := collector.getAllItems()
log.WithField("progress", "").Infof("Collected %d items matching the backup spec from the Kubernetes API (actual number of items backed up may be more or less depending on velero.io/exclude-from-backup annotation, plugins returning additional related items to back up, etc.)", len(items))
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d}}}`, len(items))
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(backupRequest.Name, types.MergePatchType, []byte(patch)); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress.totalItems")
}
itemBackupper := &itemBackupper{
backupRequest: backupRequest,
tarWriter: tw,
dynamicFactory: kb.dynamicFactory,
discoveryHelper: kb.discoveryHelper,
resticBackupper: resticBackupper,
resticSnapshotTracker: newPVCSnapshotTracker(),
volumeSnapshotterGetter: volumeSnapshotterGetter,
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: kb.podCommandExecutor,
},
}
// helper struct to send current progress between the main
// backup loop and the gouroutine that periodically patches
// the backup CR with progress updates
type progressUpdate struct {
totalItems, itemsBackedUp int
}
// the main backup process will send on this channel once
// for every item it processes.
update := make(chan progressUpdate)
// the main backup process will send on this channel when
// it's done sending progress updates
quit := make(chan struct{})
// This is the progress updater goroutine that receives
// progress updates on the 'update' channel. It patches
// the backup CR with progress updates at most every second,
// but it will not issue a patch if it hasn't received a new
// update since the previous patch. This goroutine exits
// when it receives on the 'quit' channel.
go func() {
ticker := time.NewTicker(1 * time.Second)
var lastUpdate *progressUpdate
for {
select {
case <-quit:
ticker.Stop()
return
case val := <-update:
lastUpdate = &val
case <-ticker.C:
if lastUpdate != nil {
patch := fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, lastUpdate.totalItems, lastUpdate.itemsBackedUp)
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(backupRequest.Name, types.MergePatchType, []byte(patch)); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
lastUpdate = nil
}
}
}
}()
backedUpGroupResources := map[schema.GroupResource]bool{}
totalItems := len(items)
for i, item := range items {
log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Processing item")
// use an anonymous func so we can defer-close/remove the file
// as soon as we're done with it
func() {
var unstructured unstructured.Unstructured
f, err := os.Open(item.path)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error opening file containing item")
return
}
defer f.Close()
defer os.Remove(f.Name())
if err := json.NewDecoder(f).Decode(&unstructured); err != nil {
log.WithError(errors.WithStack(err)).Error("Error decoding JSON from file")
return
}
if backedUp := kb.backupItem(log, item.groupResource, itemBackupper, &unstructured, item.preferredGVR); backedUp {
backedUpGroupResources[item.groupResource] = true
}
}()
// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
totalItems = len(backupRequest.BackedUpItems) + (len(items) - (i + 1))
// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: len(backupRequest.BackedUpItems),
}
log.WithFields(map[string]interface{}{
"progress": "",
"resource": item.groupResource.String(),
"namespace": item.namespace,
"name": item.name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", len(backupRequest.BackedUpItems), totalItems)
}
// no more progress updates will be sent on the 'update' channel
quit <- struct{}{}
// back up CRD for resource if found. We should only need to do this if we've backed up at least
// one item for the resource and IncludeClusterResources is nil. If IncludeClusterResources is false
// we don't want to back it up, and if it's true it will already be included.
if backupRequest.Spec.IncludeClusterResources == nil {
for gr := range backedUpGroupResources {
kb.backupCRD(log, gr, itemBackupper)
}
}
// do a final update on progress since we may have just added some CRDs and may not have updated
// for the last few processed items.
patch = fmt.Sprintf(`{"status":{"progress":{"totalItems":%d,"itemsBackedUp":%d}}}`, len(backupRequest.BackedUpItems), len(backupRequest.BackedUpItems))
if _, err := kb.backupClient.Backups(backupRequest.Namespace).Patch(backupRequest.Name, types.MergePatchType, []byte(patch)); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))
return nil
}
func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource) bool {
backedUpItem, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
log.WithField("name", unstructured.GetName()).Infof("%d errors encountered backup up item", len(aggregate.Errors()))
// log each error separately so we get error location info in the log, and an
// accurate count of errors
for _, err = range aggregate.Errors() {
log.WithError(err).WithField("name", unstructured.GetName()).Error("Error backing up item")
}
return false
}
if err != nil {
log.WithError(err).WithField("name", unstructured.GetName()).Error("Error backing up item")
return false
}
return backedUpItem
}
// backupCRD checks if the resource is a custom resource, and if so, backs up the custom resource definition
// associated with it.
func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper) {
crdGroupResource := kuberesource.CustomResourceDefinitions
log.Debugf("Getting server preferred API version for %s", crdGroupResource)
gvr, apiResource, err := kb.discoveryHelper.ResourceFor(crdGroupResource.WithVersion(""))
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting resolved resource for %s", crdGroupResource)
return
}
log.Debugf("Got server preferred API version %s for %s", gvr.Version, crdGroupResource)
log.Debugf("Getting dynamic client for %s", gvr.String())
crdClient, err := kb.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), apiResource, "")
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting dynamic client for %s", crdGroupResource)
return
}
log.Debugf("Got dynamic client for %s", gvr.String())
// try to get a CRD whose name matches the provided GroupResource
unstructured, err := crdClient.Get(gr.String(), metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// not found: this means the GroupResource provided was not a
// custom resource, so there's no CRD to back up.
log.Debugf("No CRD found for GroupResource %s", gr.String())
return
}
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting CRD %s", gr.String())
return
}
log.Infof("Found associated CRD %s to add to backup", gr.String())
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr)
}
func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
versionFile := filepath.Join(api.MetadataDir, "version")
versionFile := filepath.Join(velerov1api.MetadataDir, "version")
versionString := fmt.Sprintf("%s\n", BackupFormatVersion)
hdr := &tar.Header{

View File

@@ -2690,9 +2690,9 @@ func newHarness(t *testing.T) *harness {
return &harness{
APIServer: apiServer,
backupper: &kubernetesBackupper{
dynamicFactory: client.NewDynamicFactory(apiServer.DynamicClient),
discoveryHelper: discoveryHelper,
groupBackupperFactory: new(defaultGroupBackupperFactory),
backupClient: apiServer.VeleroClient.VeleroV1(),
dynamicFactory: client.NewDynamicFactory(apiServer.DynamicClient),
discoveryHelper: discoveryHelper,
// unsupported
podCommandExecutor: nil,

View File

@@ -1,181 +0,0 @@
/*
Copyright 2017 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"sort"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/restic"
)
type groupBackupperFactory interface {
newGroupBackupper(
log logrus.FieldLogger,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
cohabitatingResources map[string]*cohabitatingResource,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) groupBackupper
}
type defaultGroupBackupperFactory struct{}
func (f *defaultGroupBackupperFactory) newGroupBackupper(
log logrus.FieldLogger,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
cohabitatingResources map[string]*cohabitatingResource,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) groupBackupper {
return &defaultGroupBackupper{
log: log,
backupRequest: backupRequest,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
cohabitatingResources: cohabitatingResources,
podCommandExecutor: podCommandExecutor,
tarWriter: tarWriter,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
volumeSnapshotterGetter: volumeSnapshotterGetter,
resourceBackupperFactory: &defaultResourceBackupperFactory{},
}
}
type groupBackupper interface {
backupGroup(group *metav1.APIResourceList) error
}
type defaultGroupBackupper struct {
log logrus.FieldLogger
backupRequest *Request
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
cohabitatingResources map[string]*cohabitatingResource
podCommandExecutor podexec.PodCommandExecutor
tarWriter tarWriter
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker
resourceBackupperFactory resourceBackupperFactory
volumeSnapshotterGetter VolumeSnapshotterGetter
}
// backupGroup backs up a single API group.
func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) error {
log := gb.log.WithField("group", group.GroupVersion)
log.Infof("Backing up group")
// Parse so we can check if this is the core group
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
return errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion)
}
if gv.Group == "" {
// This is the core group, so make sure we process in the following order: pods, pvcs, pvs,
// everything else.
sortCoreGroup(group)
}
rb := gb.resourceBackupperFactory.newResourceBackupper(
log,
gb.backupRequest,
gb.dynamicFactory,
gb.discoveryHelper,
gb.cohabitatingResources,
gb.podCommandExecutor,
gb.tarWriter,
gb.resticBackupper,
gb.resticSnapshotTracker,
gb.volumeSnapshotterGetter,
)
for _, resource := range group.APIResources {
if err := rb.backupResource(group, resource); err != nil {
log.WithError(err).WithField("resource", resource.String()).Error("Error backing up API resource")
}
}
return nil
}
// sortCoreGroup sorts group as a coreGroup.
func sortCoreGroup(group *metav1.APIResourceList) {
sort.Stable(coreGroup(group.APIResources))
}
// coreGroup is used to sort APIResources in the core API group. The sort order is pods, pvcs, pvs,
// then everything else.
type coreGroup []metav1.APIResource
func (c coreGroup) Len() int {
return len(c)
}
func (c coreGroup) Less(i, j int) bool {
return coreGroupResourcePriority(c[i].Name) < coreGroupResourcePriority(c[j].Name)
}
func (c coreGroup) Swap(i, j int) {
c[j], c[i] = c[i], c[j]
}
// These constants represent the relative priorities for resources in the core API group. We want to
// ensure that we process pods, then pvcs, then pvs, then anything else. This ensures that when a
// pod is backed up, we can perform a pre hook, then process pvcs and pvs (including taking a
// snapshot), then perform a post hook on the pod.
const (
pod = iota
pvc
pv
other
)
// coreGroupResourcePriority returns the relative priority of the resource, in the following order:
// pods, pvcs, pvs, everything else.
func coreGroupResourcePriority(resource string) int {
switch strings.ToLower(resource) {
case "pods":
return pod
case "persistentvolumeclaims":
return pvc
case "persistentvolumes":
return pv
}
return other
}

View File

@@ -33,68 +33,18 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/volume"
)
type itemBackupperFactory interface {
newItemBackupper(
backup *Request,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) ItemBackupper
}
type defaultItemBackupperFactory struct{}
func (f *defaultItemBackupperFactory) newItemBackupper(
backupRequest *Request,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) ItemBackupper {
ib := &defaultItemBackupper{
backupRequest: backupRequest,
tarWriter: tarWriter,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
volumeSnapshotterGetter: volumeSnapshotterGetter,
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: podCommandExecutor,
},
}
// this is for testing purposes
ib.additionalItemBackupper = ib
return ib
}
type ItemBackupper interface {
backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error)
}
type defaultItemBackupper struct {
// itemBackupper can back up individual items to a tar writer.
type itemBackupper struct {
backupRequest *Request
tarWriter tarWriter
dynamicFactory client.DynamicFactory
@@ -104,7 +54,6 @@ type defaultItemBackupper struct {
volumeSnapshotterGetter VolumeSnapshotterGetter
itemHookHandler itemHookHandler
additionalItemBackupper ItemBackupper
snapshotLocationVolumeSnapshotters map[string]velero.VolumeSnapshotter
}
@@ -112,7 +61,7 @@ type defaultItemBackupper struct {
// namespaces IncludesExcludes list.
// In addition to the error return, backupItem also returns a bool indicating whether the item
// was actually backed up.
func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) {
func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, preferredGVR schema.GroupVersionResource) (bool, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return false, err
@@ -268,13 +217,13 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
versionPath := version
if version == preferredVersion {
versionPath = version + api.PreferredVersionDir
versionPath = version + velerov1api.PreferredVersionDir
}
if namespace != "" {
filePath = filepath.Join(api.ResourcesDir, groupResource.String(), versionPath, api.NamespaceScopedDir, namespace, name+".json")
filePath = filepath.Join(velerov1api.ResourcesDir, groupResource.String(), versionPath, velerov1api.NamespaceScopedDir, namespace, name+".json")
} else {
filePath = filepath.Join(api.ResourcesDir, groupResource.String(), versionPath, api.ClusterScopedDir, name+".json")
filePath = filepath.Join(velerov1api.ResourcesDir, groupResource.String(), versionPath, velerov1api.ClusterScopedDir, name+".json")
}
itemBytes, err := json.Marshal(obj.UnstructuredContent())
@@ -302,9 +251,9 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
if version == preferredVersion {
if namespace != "" {
filePath = filepath.Join(api.ResourcesDir, groupResource.String(), api.NamespaceScopedDir, namespace, name+".json")
filePath = filepath.Join(velerov1api.ResourcesDir, groupResource.String(), velerov1api.NamespaceScopedDir, namespace, name+".json")
} else {
filePath = filepath.Join(api.ResourcesDir, groupResource.String(), api.ClusterScopedDir, name+".json")
filePath = filepath.Join(velerov1api.ResourcesDir, groupResource.String(), velerov1api.ClusterScopedDir, name+".json")
}
hdr = &tar.Header{
@@ -330,7 +279,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
// backupPodVolumes triggers restic backups of the specified pod volumes, and returns a list of PodVolumeBackups
// for volumes that were successfully backed up, and a slice of any errors that were encountered.
func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) {
func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) {
if len(volumes) == 0 {
return nil, nil
}
@@ -343,7 +292,7 @@ func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *co
return ib.resticBackupper.BackupPodVolumes(ib.backupRequest.Backup, pod, volumes, log)
}
func (ib *defaultItemBackupper) executeActions(
func (ib *itemBackupper) executeActions(
log logrus.FieldLogger,
obj runtime.Unstructured,
groupResource schema.GroupResource,
@@ -395,7 +344,7 @@ func (ib *defaultItemBackupper) executeActions(
return nil, errors.WithStack(err)
}
if _, err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource(), gvr); err != nil {
if _, err = ib.backupItem(log, additionalItem, gvr.GroupResource(), gvr); err != nil {
return nil, err
}
}
@@ -406,7 +355,7 @@ func (ib *defaultItemBackupper) executeActions(
// volumeSnapshotter instantiates and initializes a VolumeSnapshotter given a VolumeSnapshotLocation,
// or returns an existing one if one's already been initialized for the location.
func (ib *defaultItemBackupper) volumeSnapshotter(snapshotLocation *api.VolumeSnapshotLocation) (velero.VolumeSnapshotter, error) {
func (ib *itemBackupper) volumeSnapshotter(snapshotLocation *velerov1api.VolumeSnapshotLocation) (velero.VolumeSnapshotter, error) {
if bs, ok := ib.snapshotLocationVolumeSnapshotters[snapshotLocation.Name]; ok {
return bs, nil
}
@@ -440,7 +389,7 @@ const (
// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud
// disk type and IOPS (if applicable) to be able to restore to current state later.
func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error {
func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error {
log.Info("Executing takePVSnapshot")
if boolptr.IsSetToFalse(ib.backupRequest.Spec.SnapshotVolumes) {
@@ -543,7 +492,7 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log log
return kubeerrs.NewAggregate(errs)
}
func volumeSnapshot(backup *api.Backup, volumeName, volumeID, volumeType, az, location string, iops *int64) *volume.Snapshot {
func volumeSnapshot(backup *velerov1api.Backup, volumeName, volumeID, volumeType, az, location string, iops *int64) *volume.Snapshot {
return &volume.Snapshot{
Spec: volume.SnapshotSpec{
BackupName: backup.Name,

View File

@@ -0,0 +1,360 @@
/*
Copyright 2017, 2020 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"encoding/json"
"io/ioutil"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/util/collections"
)
// itemCollector collects items from the Kubernetes API according to
// the backup spec and writes them to files inside dir.
type itemCollector struct {
log logrus.FieldLogger
backupRequest *Request
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
cohabitatingResources map[string]*cohabitatingResource
dir string
}
type kubernetesResource struct {
groupResource schema.GroupResource
preferredGVR schema.GroupVersionResource
namespace, name, path string
}
// getAllItems gets all relevant items from all API groups.
func (r *itemCollector) getAllItems() []*kubernetesResource {
var resources []*kubernetesResource
for _, group := range r.discoveryHelper.Resources() {
groupItems, err := r.getGroupItems(r.log, group)
if err != nil {
r.log.WithError(err).WithField("apiGroup", group.String()).Error("Error collecting resources from API group")
continue
}
resources = append(resources, groupItems...)
}
return resources
}
// getGroupItems collects all relevant items from a single API group.
func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIResourceList) ([]*kubernetesResource, error) {
log = log.WithField("group", group.GroupVersion)
log.Infof("Getting items for group")
// Parse so we can check if this is the core group
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
return nil, errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion)
}
if gv.Group == "" {
// This is the core group, so make sure we process in the following order: pods, pvcs, pvs,
// everything else.
sortCoreGroup(group)
}
var items []*kubernetesResource
for _, resource := range group.APIResources {
resourceItems, err := r.getResourceItems(log, gv, resource)
if err != nil {
log.WithError(err).WithField("resource", resource.String()).Error("Error getting items for resource")
continue
}
items = append(items, resourceItems...)
}
return items, nil
}
// getResourceItems collects all relevant items for a given group-version-resource.
func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.GroupVersion, resource metav1.APIResource) ([]*kubernetesResource, error) {
log = log.WithField("resource", resource.Name)
log.Info("Getting items for resource")
var (
gvr = gv.WithResource(resource.Name)
gr = gvr.GroupResource()
clusterScoped = !resource.Namespaced
)
// Getting the preferred group version of this resource
preferredGVR, _, err := r.discoveryHelper.ResourceFor(gr.WithVersion(""))
if err != nil {
return nil, errors.WithStack(err)
}
// If the resource we are backing up is NOT namespaces, and it is cluster-scoped, check to see if
// we should include it based on the IncludeClusterResources setting.
if gr != kuberesource.Namespaces && clusterScoped {
if r.backupRequest.Spec.IncludeClusterResources == nil {
if !r.backupRequest.NamespaceIncludesExcludes.IncludeEverything() {
// when IncludeClusterResources == nil (auto), only directly
// back up cluster-scoped resources if we're doing a full-cluster
// (all namespaces) backup. Note that in the case of a subset of
// namespaces being backed up, some related cluster-scoped resources
// may still be backed up if triggered by a custom action (e.g. PVC->PV).
// If we're processing namespaces themselves, we will not skip here, they may be
// filtered out later.
log.Info("Skipping resource because it's cluster-scoped and only specific namespaces are included in the backup")
return nil, nil
}
} else if !*r.backupRequest.Spec.IncludeClusterResources {
log.Info("Skipping resource because it's cluster-scoped")
return nil, nil
}
}
if !r.backupRequest.ResourceIncludesExcludes.ShouldInclude(gr.String()) {
log.Infof("Skipping resource because it's excluded")
return nil, nil
}
if cohabitator, found := r.cohabitatingResources[resource.Name]; found {
if cohabitator.seen {
log.WithFields(
logrus.Fields{
"cohabitatingResource1": cohabitator.groupResource1.String(),
"cohabitatingResource2": cohabitator.groupResource2.String(),
},
).Infof("Skipping resource because it cohabitates and we've already processed it")
return nil, nil
}
cohabitator.seen = true
}
namespacesToList := getNamespacesToList(r.backupRequest.NamespaceIncludesExcludes)
// Check if we're backing up namespaces, and only certain ones
if gr == kuberesource.Namespaces && namespacesToList[0] != "" {
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, "")
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
} else {
var labelSelector labels.Selector
if r.backupRequest.Spec.LabelSelector != nil {
labelSelector, err = metav1.LabelSelectorAsSelector(r.backupRequest.Spec.LabelSelector)
if err != nil {
// This should never happen...
return nil, errors.Wrap(err, "invalid label selector")
}
}
var items []*kubernetesResource
for _, ns := range namespacesToList {
log = log.WithField("namespace", ns)
log.Info("Getting namespace")
unstructured, err := resourceClient.Get(ns, metav1.GetOptions{})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting namespace")
continue
}
labels := labels.Set(unstructured.GetLabels())
if labelSelector != nil && !labelSelector.Matches(labels) {
log.Info("Skipping namespace because it does not match the backup's label selector")
continue
}
path, err := r.writeToFile(unstructured)
if err != nil {
log.WithError(err).Error("Error writing item to file")
continue
}
items = append(items, &kubernetesResource{
groupResource: gr,
preferredGVR: preferredGVR,
name: ns,
path: path,
})
}
return items, nil
}
}
// If we get here, we're backing up something other than namespaces
if clusterScoped {
namespacesToList = []string{""}
}
var items []*kubernetesResource
for _, namespace := range namespacesToList {
log = log.WithField("namespace", namespace)
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, namespace)
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
continue
}
var labelSelector string
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
log.Info("Listing items")
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
log.Infof("Retrieved %d items", len(unstructuredList.Items))
// collect the items
for i := range unstructuredList.Items {
item := &unstructuredList.Items[i]
if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(item.GetName()) {
log.WithField("name", item.GetName()).Info("Skipping namespace because it's excluded")
continue
}
path, err := r.writeToFile(item)
if err != nil {
log.WithError(err).Error("Error writing item to file")
continue
}
items = append(items, &kubernetesResource{
groupResource: gr,
preferredGVR: preferredGVR,
namespace: item.GetNamespace(),
name: item.GetName(),
path: path,
})
}
}
return items, nil
}
func (r *itemCollector) writeToFile(item *unstructured.Unstructured) (string, error) {
f, err := ioutil.TempFile(r.dir, "")
if err != nil {
return "", errors.Wrap(err, "error creating temp file")
}
defer f.Close()
jsonBytes, err := json.Marshal(item)
if err != nil {
return "", errors.Wrap(err, "error converting item to JSON")
}
if _, err := f.Write(jsonBytes); err != nil {
return "", errors.Wrap(err, "error writing JSON to file")
}
if err := f.Close(); err != nil {
return "", errors.Wrap(err, "error closing file")
}
return f.Name(), nil
}
// sortCoreGroup sorts the core API group.
func sortCoreGroup(group *metav1.APIResourceList) {
sort.SliceStable(group.APIResources, func(i, j int) bool {
return coreGroupResourcePriority(group.APIResources[i].Name) < coreGroupResourcePriority(group.APIResources[j].Name)
})
}
// These constants represent the relative priorities for resources in the core API group. We want to
// ensure that we process pods, then pvcs, then pvs, then anything else. This ensures that when a
// pod is backed up, we can perform a pre hook, then process pvcs and pvs (including taking a
// snapshot), then perform a post hook on the pod.
const (
pod = iota
pvc
pv
other
)
// coreGroupResourcePriority returns the relative priority of the resource, in the following order:
// pods, pvcs, pvs, everything else.
func coreGroupResourcePriority(resource string) int {
switch strings.ToLower(resource) {
case "pods":
return pod
case "persistentvolumeclaims":
return pvc
case "persistentvolumes":
return pv
}
return other
}
// getNamespacesToList examines ie and resolves the includes and excludes to a full list of
// namespaces to list. If ie is nil or it includes *, the result is just "" (list across all
// namespaces). Otherwise, the result is a list of every included namespace minus all excluded ones.
func getNamespacesToList(ie *collections.IncludesExcludes) []string {
if ie == nil {
return []string{""}
}
if ie.ShouldInclude("*") {
// "" means all namespaces
return []string{""}
}
var list []string
for _, i := range ie.GetIncludes() {
if ie.ShouldInclude(i) {
list = append(list, i)
}
}
return list
}
type cohabitatingResource struct {
resource string
groupResource1 schema.GroupResource
groupResource2 schema.GroupResource
seen bool
}
func newCohabitatingResource(resource, group1, group2 string) *cohabitatingResource {
return &cohabitatingResource{
resource: resource,
groupResource1: schema.GroupResource{Group: group1, Resource: resource},
groupResource2: schema.GroupResource{Group: group2, Resource: resource},
seen: false,
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright 2017, 2019 the Velero contributors.
Copyright 2017, 2019, 2020 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,374 +0,0 @@
/*
Copyright 2017 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/collections"
)
type resourceBackupperFactory interface {
newResourceBackupper(
log logrus.FieldLogger,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
cohabitatingResources map[string]*cohabitatingResource,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) resourceBackupper
}
type defaultResourceBackupperFactory struct{}
func (f *defaultResourceBackupperFactory) newResourceBackupper(
log logrus.FieldLogger,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
cohabitatingResources map[string]*cohabitatingResource,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
volumeSnapshotterGetter VolumeSnapshotterGetter,
) resourceBackupper {
return &defaultResourceBackupper{
log: log,
backupRequest: backupRequest,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
cohabitatingResources: cohabitatingResources,
podCommandExecutor: podCommandExecutor,
tarWriter: tarWriter,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
volumeSnapshotterGetter: volumeSnapshotterGetter,
itemBackupperFactory: &defaultItemBackupperFactory{},
}
}
type resourceBackupper interface {
backupResource(group *metav1.APIResourceList, resource metav1.APIResource) error
}
type defaultResourceBackupper struct {
log logrus.FieldLogger
backupRequest *Request
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
cohabitatingResources map[string]*cohabitatingResource
podCommandExecutor podexec.PodCommandExecutor
tarWriter tarWriter
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker
itemBackupperFactory itemBackupperFactory
volumeSnapshotterGetter VolumeSnapshotterGetter
}
// backupResource backs up all the objects for a given group-version-resource.
func (rb *defaultResourceBackupper) backupResource(group *metav1.APIResourceList, resource metav1.APIResource) error {
log := rb.log.WithField("resource", resource.Name)
log.Info("Backing up resource")
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
return errors.Wrapf(err, "error parsing GroupVersion %s", group.GroupVersion)
}
gr := schema.GroupResource{Group: gv.Group, Resource: resource.Name}
// Getting the preferred group version of this resource
preferredGVR, _, err := rb.discoveryHelper.ResourceFor(gr.WithVersion(""))
if err != nil {
return errors.WithStack(err)
}
clusterScoped := !resource.Namespaced
// If the resource we are backing up is NOT namespaces, and it is cluster-scoped, check to see if
// we should include it based on the IncludeClusterResources setting.
if gr != kuberesource.Namespaces && clusterScoped {
if rb.backupRequest.Spec.IncludeClusterResources == nil {
if !rb.backupRequest.NamespaceIncludesExcludes.IncludeEverything() {
// when IncludeClusterResources == nil (auto), only directly
// back up cluster-scoped resources if we're doing a full-cluster
// (all namespaces) backup. Note that in the case of a subset of
// namespaces being backed up, some related cluster-scoped resources
// may still be backed up if triggered by a custom action (e.g. PVC->PV).
// If we're processing namespaces themselves, we will not skip here, they may be
// filtered out later.
log.Info("Skipping resource because it's cluster-scoped and only specific namespaces are included in the backup")
return nil
}
} else if !*rb.backupRequest.Spec.IncludeClusterResources {
log.Info("Skipping resource because it's cluster-scoped")
return nil
}
}
if !rb.backupRequest.ResourceIncludesExcludes.ShouldInclude(gr.String()) {
log.Infof("Skipping resource because it's excluded")
return nil
}
if cohabitator, found := rb.cohabitatingResources[resource.Name]; found {
if cohabitator.seen {
log.WithFields(
logrus.Fields{
"cohabitatingResource1": cohabitator.groupResource1.String(),
"cohabitatingResource2": cohabitator.groupResource2.String(),
},
).Infof("Skipping resource because it cohabitates and we've already processed it")
return nil
}
cohabitator.seen = true
}
itemBackupper := rb.itemBackupperFactory.newItemBackupper(
rb.backupRequest,
rb.podCommandExecutor,
rb.tarWriter,
rb.dynamicFactory,
rb.discoveryHelper,
rb.resticBackupper,
rb.resticSnapshotTracker,
rb.volumeSnapshotterGetter,
)
namespacesToList := getNamespacesToList(rb.backupRequest.NamespaceIncludesExcludes)
// Check if we're backing up namespaces, and only certain ones
if gr == kuberesource.Namespaces && namespacesToList[0] != "" {
resourceClient, err := rb.dynamicFactory.ClientForGroupVersionResource(gv, resource, "")
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
} else {
var labelSelector labels.Selector
if rb.backupRequest.Spec.LabelSelector != nil {
labelSelector, err = metav1.LabelSelectorAsSelector(rb.backupRequest.Spec.LabelSelector)
if err != nil {
// This should never happen...
return errors.Wrap(err, "invalid label selector")
}
}
for _, ns := range namespacesToList {
log = log.WithField("namespace", ns)
log.Info("Getting namespace")
unstructured, err := resourceClient.Get(ns, metav1.GetOptions{})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting namespace")
continue
}
labels := labels.Set(unstructured.GetLabels())
if labelSelector != nil && !labelSelector.Matches(labels) {
log.Info("Skipping namespace because it does not match the backup's label selector")
continue
}
if _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR); err != nil {
log.WithError(errors.WithStack(err)).Error("Error backing up namespace")
}
}
return nil
}
}
// If we get here, we're backing up something other than namespaces
if clusterScoped {
namespacesToList = []string{""}
}
backedUpItem := false
for _, namespace := range namespacesToList {
log = log.WithField("namespace", namespace)
resourceClient, err := rb.dynamicFactory.ClientForGroupVersionResource(gv, resource, namespace)
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
continue
}
var labelSelector string
if selector := rb.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
log.Info("Listing items")
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
log.Infof("Retrieved %d items", len(unstructuredList.Items))
// do the backup
for _, item := range unstructuredList.Items {
if rb.backupItem(log, gr, itemBackupper, &item, preferredGVR) {
backedUpItem = true
}
}
}
// back up CRD for resource if found. We should only need to do this if we've backed up at least
// one item and IncludeClusterResources is nil. If IncludeClusterResources is false
// we don't want to back it up, and if it's true it will already be included.
if backedUpItem && rb.backupRequest.Spec.IncludeClusterResources == nil {
rb.backupCRD(log, gr, itemBackupper)
}
return nil
}
func (rb *defaultResourceBackupper) backupItem(
log logrus.FieldLogger,
gr schema.GroupResource,
itemBackupper ItemBackupper,
unstructured runtime.Unstructured,
preferredGVR schema.GroupVersionResource,
) bool {
metadata, err := meta.Accessor(unstructured)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting a metadata accessor")
return false
}
log = log.WithFields(map[string]interface{}{
"namespace": metadata.GetNamespace(),
"name": metadata.GetName(),
})
if gr == kuberesource.Namespaces && !rb.backupRequest.NamespaceIncludesExcludes.ShouldInclude(metadata.GetName()) {
log.Info("Skipping namespace because it's excluded")
return false
}
backedUpItem, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
log.Infof("%d errors encountered backup up item", len(aggregate.Errors()))
// log each error separately so we get error location info in the log, and an
// accurate count of errors
for _, err = range aggregate.Errors() {
log.WithError(err).Error("Error backing up item")
}
return false
}
if err != nil {
log.WithError(err).Error("Error backing up item")
return false
}
return backedUpItem
}
// backupCRD checks if the resource is a custom resource, and if so, backs up the custom resource definition
// associated with it.
func (rb *defaultResourceBackupper) backupCRD(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper ItemBackupper) {
crdGroupResource := kuberesource.CustomResourceDefinitions
log.Debugf("Getting server preferred API version for %s", crdGroupResource)
gvr, apiResource, err := rb.discoveryHelper.ResourceFor(crdGroupResource.WithVersion(""))
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting resolved resource for %s", crdGroupResource)
return
}
log.Debugf("Got server preferred API version %s for %s", gvr.Version, crdGroupResource)
log.Debugf("Getting dynamic client for %s", gvr.String())
crdClient, err := rb.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), apiResource, "")
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting dynamic client for %s", crdGroupResource)
return
}
log.Debugf("Got dynamic client for %s", gvr.String())
// try to get a CRD whose name matches the provided GroupResource
unstructured, err := crdClient.Get(gr.String(), metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// not found: this means the GroupResource provided was not a
// custom resource, so there's no CRD to back up.
log.Debugf("No CRD found for GroupResource %s", gr.String())
return
}
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Error getting CRD %s", gr.String())
return
}
log.Infof("Found associated CRD %s to add to backup", gr.String())
rb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr)
}
// getNamespacesToList examines ie and resolves the includes and excludes to a full list of
// namespaces to list. If ie is nil or it includes *, the result is just "" (list across all
// namespaces). Otherwise, the result is a list of every included namespace minus all excluded ones.
func getNamespacesToList(ie *collections.IncludesExcludes) []string {
if ie == nil {
return []string{""}
}
if ie.ShouldInclude("*") {
// "" means all namespaces
return []string{""}
}
var list []string
for _, i := range ie.GetIncludes() {
if ie.ShouldInclude(i) {
list = append(list, i)
}
}
return list
}
type cohabitatingResource struct {
resource string
groupResource1 schema.GroupResource
groupResource2 schema.GroupResource
seen bool
}
func newCohabitatingResource(resource, group1, group2 string) *cohabitatingResource {
return &cohabitatingResource{
resource: resource,
groupResource1: schema.GroupResource{Group: group1, Resource: resource},
groupResource2: schema.GroupResource{Group: group2, Resource: resource},
seen: false,
}
}

View File

@@ -632,6 +632,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupControllerRunInfo := func() controllerRunInfo {
backupper, err := backup.NewKubernetesBackupper(
s.veleroClient.VeleroV1(),
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),

View File

@@ -238,6 +238,18 @@ func DescribeBackupStatus(d *Describer, backup *velerov1api.Backup, details bool
d.Printf("Expiration:\t%s\n", status.Expiration)
d.Println()
if backup.Status.Progress != nil {
if backup.Status.Phase == velerov1api.BackupPhaseInProgress {
d.Printf("Estimated total items to be backed up:\t%d\n", backup.Status.Progress.TotalItems)
d.Printf("Items backed up so far:\t%d\n", backup.Status.Progress.ItemsBackedUp)
} else {
d.Printf("Total items to be backed up:\t%d\n", backup.Status.Progress.TotalItems)
d.Printf("Items backed up:\t%d\n", backup.Status.Progress.ItemsBackedUp)
}
d.Println()
}
if details {
describeBackupResourceList(d, backup, veleroClient, insecureSkipTLSVerify, caCertPath)
d.Println()

File diff suppressed because one or more lines are too long

View File

@@ -355,6 +355,23 @@ spec:
- Failed
- Deleting
type: string
progress:
description: Progress contains information about the backup's execution
progress. Note that this information is best-effort only -- if Velero
fails to update it during a backup for any reason, it may be inaccurate/stale.
properties:
itemsBackedUp:
description: ItemsBackedUp is the number of items that have actually
been written to the backup tarball so far.
type: integer
totalItems:
description: TotalItems is the total number of items to be backed
up. This number may change throughout the execution of the backup
due to plugins that return additional related items to back up,
the velero.io/exclude-from-backup label, and various other filters
that happen as items are processed.
type: integer
type: object
startTimestamp:
description: StartTimestamp records the time a backup was started. Separate
from CreationTimestamp, since that value changes on restores. The