Perf improvements for existing resource restore

Use informer cache with dynamic client for Get calls on restore
When enabled, also make the Get call before create.

Add server and install parameter to allow disabling this feature,
but enable by default

Signed-off-by: Scott Seago <sseago@redhat.com>
This commit is contained in:
Scott Seago
2023-08-21 11:36:46 -04:00
parent b31610157d
commit 7750e12151
14 changed files with 243 additions and 34 deletions

View File

@@ -0,0 +1 @@
Perf improvements for existing resource restore

View File

@@ -18,6 +18,7 @@ package client
import ( import (
"context" "context"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -25,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
) )
// DynamicFactory contains methods for retrieving dynamic clients for GroupVersionResources and // DynamicFactory contains methods for retrieving dynamic clients for GroupVersionResources and
@@ -33,6 +35,8 @@ type DynamicFactory interface {
// ClientForGroupVersionResource returns a Dynamic client for the given group/version // ClientForGroupVersionResource returns a Dynamic client for the given group/version
// and resource for the given namespace. // and resource for the given namespace.
ClientForGroupVersionResource(gv schema.GroupVersion, resource metav1.APIResource, namespace string) (Dynamic, error) ClientForGroupVersionResource(gv schema.GroupVersion, resource metav1.APIResource, namespace string) (Dynamic, error)
// DynamicSharedInformerFactoryForNamespace returns a DynamicSharedInformerFactory for the given namespace.
DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory
} }
// dynamicFactory implements DynamicFactory. // dynamicFactory implements DynamicFactory.
@@ -51,6 +55,10 @@ func (f *dynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersion, r
}, nil }, nil
} }
func (f *dynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory {
return dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.dynamicClient, time.Minute, namespace, nil)
}
// Creator creates an object. // Creator creates an object.
type Creator interface { type Creator interface {
// Create creates an object. // Create creates an object.

View File

@@ -81,6 +81,7 @@ type Options struct {
DefaultVolumesToFsBackup bool DefaultVolumesToFsBackup bool
UploaderType string UploaderType string
DefaultSnapshotMoveData bool DefaultSnapshotMoveData bool
DisableInformerCache bool
} }
// BindFlags adds command line values to the options struct. // BindFlags adds command line values to the options struct.
@@ -122,6 +123,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.DefaultVolumesToFsBackup, "default-volumes-to-fs-backup", o.DefaultVolumesToFsBackup, "Bool flag to configure Velero server to use pod volume file system backup by default for all volumes on all backups. Optional.") flags.BoolVar(&o.DefaultVolumesToFsBackup, "default-volumes-to-fs-backup", o.DefaultVolumesToFsBackup, "Bool flag to configure Velero server to use pod volume file system backup by default for all volumes on all backups. Optional.")
flags.StringVar(&o.UploaderType, "uploader-type", o.UploaderType, fmt.Sprintf("The type of uploader to transfer the data of pod volumes, the supported values are '%s', '%s'", uploader.ResticType, uploader.KopiaType)) flags.StringVar(&o.UploaderType, "uploader-type", o.UploaderType, fmt.Sprintf("The type of uploader to transfer the data of pod volumes, the supported values are '%s', '%s'", uploader.ResticType, uploader.KopiaType))
flags.BoolVar(&o.DefaultSnapshotMoveData, "default-snapshot-move-data", o.DefaultSnapshotMoveData, "Bool flag to configure Velero server to move data by default for all snapshots supporting data movement. Optional.") flags.BoolVar(&o.DefaultSnapshotMoveData, "default-snapshot-move-data", o.DefaultSnapshotMoveData, "Bool flag to configure Velero server to move data by default for all snapshots supporting data movement. Optional.")
flags.BoolVar(&o.DisableInformerCache, "disable-informer-cache", o.DisableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable). Optional.")
} }
// NewInstallOptions instantiates a new, default InstallOptions struct. // NewInstallOptions instantiates a new, default InstallOptions struct.
@@ -149,6 +151,7 @@ func NewInstallOptions() *Options {
DefaultVolumesToFsBackup: false, DefaultVolumesToFsBackup: false,
UploaderType: uploader.KopiaType, UploaderType: uploader.KopiaType,
DefaultSnapshotMoveData: false, DefaultSnapshotMoveData: false,
DisableInformerCache: true,
} }
} }
@@ -213,6 +216,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
DefaultVolumesToFsBackup: o.DefaultVolumesToFsBackup, DefaultVolumesToFsBackup: o.DefaultVolumesToFsBackup,
UploaderType: o.UploaderType, UploaderType: o.UploaderType,
DefaultSnapshotMoveData: o.DefaultSnapshotMoveData, DefaultSnapshotMoveData: o.DefaultSnapshotMoveData,
DisableInformerCache: o.DisableInformerCache,
}, nil }, nil
} }

View File

@@ -112,6 +112,7 @@ const (
defaultCredentialsDirectory = "/tmp/credentials" defaultCredentialsDirectory = "/tmp/credentials"
defaultMaxConcurrentK8SConnections = 30 defaultMaxConcurrentK8SConnections = 30
defaultDisableInformerCache = false
) )
type serverConfig struct { type serverConfig struct {
@@ -136,6 +137,7 @@ type serverConfig struct {
uploaderType string uploaderType string
maxConcurrentK8SConnections int maxConcurrentK8SConnections int
defaultSnapshotMoveData bool defaultSnapshotMoveData bool
disableInformerCache bool
} }
func NewCommand(f client.Factory) *cobra.Command { func NewCommand(f client.Factory) *cobra.Command {
@@ -165,6 +167,7 @@ func NewCommand(f client.Factory) *cobra.Command {
uploaderType: uploader.ResticType, uploaderType: uploader.ResticType,
maxConcurrentK8SConnections: defaultMaxConcurrentK8SConnections, maxConcurrentK8SConnections: defaultMaxConcurrentK8SConnections,
defaultSnapshotMoveData: false, defaultSnapshotMoveData: false,
disableInformerCache: defaultDisableInformerCache,
} }
) )
@@ -236,6 +239,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.")
command.Flags().IntVar(&config.maxConcurrentK8SConnections, "max-concurrent-k8s-connections", config.maxConcurrentK8SConnections, "Max concurrent connections number that Velero can create with kube-apiserver. Default is 30.") command.Flags().IntVar(&config.maxConcurrentK8SConnections, "max-concurrent-k8s-connections", config.maxConcurrentK8SConnections, "Max concurrent connections number that Velero can create with kube-apiserver. Default is 30.")
command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.") command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.")
command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. WIth this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).")
return command return command
} }
@@ -936,6 +940,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.metrics, s.metrics,
s.config.formatFlag.Parse(), s.config.formatFlag.Parse(),
s.config.defaultItemOperationTimeout, s.config.defaultItemOperationTimeout,
s.config.disableInformerCache,
) )
if err = r.SetupWithManager(s.mgr); err != nil { if err = r.SetupWithManager(s.mgr); err != nil {

View File

@@ -101,6 +101,7 @@ type restoreReconciler struct {
logFormat logging.Format logFormat logging.Format
clock clock.WithTickerAndDelayedExecution clock clock.WithTickerAndDelayedExecution
defaultItemOperationTimeout time.Duration defaultItemOperationTimeout time.Duration
disableInformerCache bool
newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager newPluginManager func(logger logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter backupStoreGetter persistence.ObjectBackupStoreGetter
@@ -123,6 +124,7 @@ func NewRestoreReconciler(
metrics *metrics.ServerMetrics, metrics *metrics.ServerMetrics,
logFormat logging.Format, logFormat logging.Format,
defaultItemOperationTimeout time.Duration, defaultItemOperationTimeout time.Duration,
disableInformerCache bool,
) *restoreReconciler { ) *restoreReconciler {
r := &restoreReconciler{ r := &restoreReconciler{
ctx: ctx, ctx: ctx,
@@ -135,6 +137,7 @@ func NewRestoreReconciler(
logFormat: logFormat, logFormat: logFormat,
clock: &clock.RealClock{}, clock: &clock.RealClock{},
defaultItemOperationTimeout: defaultItemOperationTimeout, defaultItemOperationTimeout: defaultItemOperationTimeout,
disableInformerCache: disableInformerCache,
// use variables to refer to these functions so they can be // use variables to refer to these functions so they can be
// replaced with fakes for testing. // replaced with fakes for testing.
@@ -519,13 +522,14 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
} }
restoreReq := &pkgrestore.Request{ restoreReq := &pkgrestore.Request{
Log: restoreLog, Log: restoreLog,
Restore: restore, Restore: restore,
Backup: info.backup, Backup: info.backup,
PodVolumeBackups: podVolumeBackups, PodVolumeBackups: podVolumeBackups,
VolumeSnapshots: volumeSnapshots, VolumeSnapshots: volumeSnapshots,
BackupReader: backupFile, BackupReader: backupFile,
ResourceModifiers: resourceModifiers, ResourceModifiers: resourceModifiers,
DisableInformerCache: r.disableInformerCache,
} }
restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager) restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager)

View File

@@ -114,6 +114,7 @@ func TestFetchBackupInfo(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
formatFlag, formatFlag,
60*time.Minute, 60*time.Minute,
false,
) )
if test.backupStoreError == nil { if test.backupStoreError == nil {
@@ -191,6 +192,7 @@ func TestProcessQueueItemSkips(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
formatFlag, formatFlag,
60*time.Minute, 60*time.Minute,
false,
) )
_, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{ _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{
@@ -445,6 +447,7 @@ func TestRestoreReconcile(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
formatFlag, formatFlag,
60*time.Minute, 60*time.Minute,
false,
) )
r.clock = clocktesting.NewFakeClock(now) r.clock = clocktesting.NewFakeClock(now)
@@ -616,6 +619,7 @@ func TestValidateAndCompleteWhenScheduleNameSpecified(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
formatFlag, formatFlag,
60*time.Minute, 60*time.Minute,
false,
) )
restore := &velerov1api.Restore{ restore := &velerov1api.Restore{
@@ -708,6 +712,7 @@ func TestValidateAndCompleteWithResourceModifierSpecified(t *testing.T) {
metrics.NewServerMetrics(), metrics.NewServerMetrics(),
formatFlag, formatFlag,
60*time.Minute, 60*time.Minute,
false,
) )
restore := &velerov1api.Restore{ restore := &velerov1api.Restore{

View File

@@ -48,6 +48,7 @@ type podTemplateConfig struct {
uploaderType string uploaderType string
defaultSnapshotMoveData bool defaultSnapshotMoveData bool
privilegedNodeAgent bool privilegedNodeAgent bool
disableInformerCache bool
} }
func WithImage(image string) podTemplateOption { func WithImage(image string) podTemplateOption {
@@ -144,6 +145,12 @@ func WithDefaultSnapshotMoveData() podTemplateOption {
} }
} }
func WithDisableInformerCache() podTemplateOption {
return func(c *podTemplateConfig) {
c.disableInformerCache = true
}
}
func WithServiceAccountName(sa string) podTemplateOption { func WithServiceAccountName(sa string) podTemplateOption {
return func(c *podTemplateConfig) { return func(c *podTemplateConfig) {
c.serviceAccountName = sa c.serviceAccountName = sa
@@ -185,6 +192,10 @@ func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment
args = append(args, "--default-snapshot-move-data=true") args = append(args, "--default-snapshot-move-data=true")
} }
if c.disableInformerCache {
args = append(args, "--disable-informer-cache=true")
}
if len(c.uploaderType) > 0 { if len(c.uploaderType) > 0 {
args = append(args, fmt.Sprintf("--uploader-type=%s", c.uploaderType)) args = append(args, fmt.Sprintf("--uploader-type=%s", c.uploaderType))
} }

View File

@@ -64,4 +64,8 @@ func TestDeployment(t *testing.T) {
deploy = Deployment("velero", WithServiceAccountName("test-sa")) deploy = Deployment("velero", WithServiceAccountName("test-sa"))
assert.Equal(t, "test-sa", deploy.Spec.Template.Spec.ServiceAccountName) assert.Equal(t, "test-sa", deploy.Spec.Template.Spec.ServiceAccountName)
deploy = Deployment("velero", WithDisableInformerCache())
assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 2)
assert.Equal(t, "--disable-informer-cache=true", deploy.Spec.Template.Spec.Containers[0].Args[1])
} }

View File

@@ -253,6 +253,7 @@ type VeleroOptions struct {
DefaultVolumesToFsBackup bool DefaultVolumesToFsBackup bool
UploaderType string UploaderType string
DefaultSnapshotMoveData bool DefaultSnapshotMoveData bool
DisableInformerCache bool
} }
func AllCRDs() *unstructured.UnstructuredList { func AllCRDs() *unstructured.UnstructuredList {
@@ -357,6 +358,10 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList {
deployOpts = append(deployOpts, WithDefaultSnapshotMoveData()) deployOpts = append(deployOpts, WithDefaultSnapshotMoveData())
} }
if o.DisableInformerCache {
deployOpts = append(deployOpts, WithDisableInformerCache())
}
deploy := Deployment(o.Namespace, deployOpts...) deploy := Deployment(o.Namespace, deployOpts...)
if err := appendUnstructured(resources, deploy); err != nil { if err := appendUnstructured(resources, deploy); err != nil {

View File

@@ -51,14 +51,15 @@ func resourceKey(obj runtime.Object) string {
type Request struct { type Request struct {
*velerov1api.Restore *velerov1api.Restore
Log logrus.FieldLogger Log logrus.FieldLogger
Backup *velerov1api.Backup Backup *velerov1api.Backup
PodVolumeBackups []*velerov1api.PodVolumeBackup PodVolumeBackups []*velerov1api.PodVolumeBackup
VolumeSnapshots []*volume.Snapshot VolumeSnapshots []*volume.Snapshot
BackupReader io.Reader BackupReader io.Reader
RestoredItems map[itemKey]restoredItemStatus RestoredItems map[itemKey]restoredItemStatus
itemOperationsList *[]*itemoperation.RestoreOperation itemOperationsList *[]*itemoperation.RestoreOperation
ResourceModifiers *resourcemodifiers.ResourceModifiers ResourceModifiers *resourcemodifiers.ResourceModifiers
DisableInformerCache bool
} }
type restoredItemStatus struct { type restoredItemStatus struct {

View File

@@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"os/signal"
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
@@ -42,6 +43,8 @@ import (
kubeerrs "k8s.io/apimachinery/pkg/util/errors" kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
crclient "sigs.k8s.io/controller-runtime/pkg/client" crclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -299,6 +302,8 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
resourceTerminatingTimeout: kr.resourceTerminatingTimeout, resourceTerminatingTimeout: kr.resourceTerminatingTimeout,
resourceTimeout: kr.resourceTimeout, resourceTimeout: kr.resourceTimeout,
resourceClients: make(map[resourceClientKey]client.Dynamic), resourceClients: make(map[resourceClientKey]client.Dynamic),
dynamicInformerFactories: make(map[string]*informerFactoryWithContext),
resourceInformers: make(map[resourceClientKey]informers.GenericInformer),
restoredItems: req.RestoredItems, restoredItems: req.RestoredItems,
renamedPVs: make(map[string]string), renamedPVs: make(map[string]string),
pvRenamer: kr.pvRenamer, pvRenamer: kr.pvRenamer,
@@ -312,6 +317,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
kbClient: kr.kbClient, kbClient: kr.kbClient,
itemOperationsList: req.GetItemOperationsList(), itemOperationsList: req.GetItemOperationsList(),
resourceModifiers: req.ResourceModifiers, resourceModifiers: req.ResourceModifiers,
disableInformerCache: req.DisableInformerCache,
} }
return restoreCtx.execute() return restoreCtx.execute()
@@ -345,6 +351,8 @@ type restoreContext struct {
resourceTerminatingTimeout time.Duration resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration resourceTimeout time.Duration
resourceClients map[resourceClientKey]client.Dynamic resourceClients map[resourceClientKey]client.Dynamic
dynamicInformerFactories map[string]*informerFactoryWithContext
resourceInformers map[resourceClientKey]informers.GenericInformer
restoredItems map[itemKey]restoredItemStatus restoredItems map[itemKey]restoredItemStatus
renamedPVs map[string]string renamedPVs map[string]string
pvRenamer func(string) (string, error) pvRenamer func(string) (string, error)
@@ -359,6 +367,7 @@ type restoreContext struct {
kbClient crclient.Client kbClient crclient.Client
itemOperationsList *[]*itemoperation.RestoreOperation itemOperationsList *[]*itemoperation.RestoreOperation
resourceModifiers *resourcemodifiers.ResourceModifiers resourceModifiers *resourcemodifiers.ResourceModifiers
disableInformerCache bool
} }
type resourceClientKey struct { type resourceClientKey struct {
@@ -366,6 +375,12 @@ type resourceClientKey struct {
namespace string namespace string
} }
type informerFactoryWithContext struct {
factory dynamicinformer.DynamicSharedInformerFactory
context go_context.Context
cancel go_context.CancelFunc
}
// getOrderedResources returns an ordered list of resource identifiers to restore, // getOrderedResources returns an ordered list of resource identifiers to restore,
// based on the provided resource priorities and backup contents. The returned list // based on the provided resource priorities and backup contents. The returned list
// begins with all of the high prioritized resources (in order), ends with all of // begins with all of the high prioritized resources (in order), ends with all of
@@ -416,6 +431,17 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
} }
}() }()
// Need to stop all informers if enabled
if !ctx.disableInformerCache {
defer func() {
// Call the cancel func to close the channel for each started informer
for _, factory := range ctx.dynamicInformerFactories {
factory.cancel()
}
// After upgrading to client-go 0.27 or newer, also call Shutdown for each informer factory
}()
}
// Need to set this for additionalItems to be restored. // Need to set this for additionalItems to be restored.
ctx.restoreDir = dir ctx.restoreDir = dir
@@ -520,6 +546,32 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
warnings.Merge(&w) warnings.Merge(&w)
errs.Merge(&e) errs.Merge(&e)
// initialize informer caches for selected resources if enabled
if !ctx.disableInformerCache {
// CRD informer will have already been initialized if any CRDs were created,
// but already-initialized informers aren't re-initialized because getGenericInformer
// looks for an existing one first.
factoriesToStart := make(map[string]*informerFactoryWithContext)
for _, informerResource := range selectedResourceCollection {
gr := schema.ParseGroupResource(informerResource.resource)
for _, items := range informerResource.selectedItemsByNamespace {
// don't use ns key since it represents original ns, not mapped ns
if len(items) == 0 {
continue
}
// use the first item in the list to initialize the informer. The rest of the list
// should share the same gvr and namespace
_, factory := ctx.getGenericInformerInternal(gr, items[0].version, items[0].targetNamespace)
if factory != nil {
factoriesToStart[items[0].targetNamespace] = factory
}
}
}
for _, factoryWithContext := range factoriesToStart {
factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done())
}
}
// reset processedItems and totalItems before processing full resource list // reset processedItems and totalItems before processing full resource list
processedItems = 0 processedItems = 0
totalItems = 0 totalItems = 0
@@ -934,11 +986,14 @@ func (ctx *restoreContext) itemsAvailable(action framework.RestoreItemResolvedAc
return available, err return available, err
} }
func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (client.Dynamic, error) { func getResourceClientKey(groupResource schema.GroupResource, version, namespace string) resourceClientKey {
key := resourceClientKey{ return resourceClientKey{
resource: groupResource.WithVersion(obj.GroupVersionKind().Version), resource: groupResource.WithVersion(version),
namespace: namespace, namespace: namespace,
} }
}
func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (client.Dynamic, error) {
key := getResourceClientKey(groupResource, obj.GroupVersionKind().Version, namespace)
if client, ok := ctx.resourceClients[key]; ok { if client, ok := ctx.resourceClients[key]; ok {
return client, nil return client, nil
@@ -962,6 +1017,49 @@ func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource,
return client, nil return client, nil
} }
// if new informer is created, non-nil factory is returned
func (ctx *restoreContext) getGenericInformerInternal(groupResource schema.GroupResource, version, namespace string) (informers.GenericInformer, *informerFactoryWithContext) {
var returnFactory *informerFactoryWithContext
key := getResourceClientKey(groupResource, version, namespace)
factoryWithContext, ok := ctx.dynamicInformerFactories[key.namespace]
if !ok {
factory := ctx.dynamicFactory.DynamicSharedInformerFactoryForNamespace(namespace)
informerContext, informerCancel := signal.NotifyContext(go_context.Background(), os.Interrupt)
factoryWithContext = &informerFactoryWithContext{
factory: factory,
context: informerContext,
cancel: informerCancel,
}
ctx.dynamicInformerFactories[key.namespace] = factoryWithContext
}
informer, ok := ctx.resourceInformers[key]
if !ok {
ctx.log.Infof("[debug] Creating factory for %s in namespace %s", key.resource, key.namespace)
informer = factoryWithContext.factory.ForResource(key.resource)
factoryWithContext.factory.Start(factoryWithContext.context.Done())
ctx.resourceInformers[key] = informer
returnFactory = factoryWithContext
}
return informer, returnFactory
}
func (ctx *restoreContext) getGenericInformer(groupResource schema.GroupResource, version, namespace string) informers.GenericInformer {
informer, factoryWithContext := ctx.getGenericInformerInternal(groupResource, version, namespace)
if factoryWithContext != nil {
factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done())
}
return informer
}
func (ctx *restoreContext) getResourceLister(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) cache.GenericNamespaceLister {
informer := ctx.getGenericInformer(groupResource, obj.GroupVersionKind().Version, namespace)
if namespace == "" {
return informer.Lister()
} else {
return informer.Lister().ByNamespace(namespace)
}
}
func getResourceID(groupResource schema.GroupResource, namespace, name string) string { func getResourceID(groupResource schema.GroupResource, namespace, name string) string {
if namespace == "" { if namespace == "" {
return fmt.Sprintf("%s/%s", groupResource.String(), name) return fmt.Sprintf("%s/%s", groupResource.String(), name)
@@ -970,6 +1068,20 @@ func getResourceID(groupResource schema.GroupResource, namespace, name string) s
return fmt.Sprintf("%s/%s/%s", groupResource.String(), namespace, name) return fmt.Sprintf("%s/%s/%s", groupResource.String(), namespace, name)
} }
func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace, name string) (*unstructured.Unstructured, error) {
lister := ctx.getResourceLister(groupResource, obj, namespace)
clusterObj, err := lister.Get(name)
if err != nil {
return nil, errors.Wrapf(err, "error getting resource from lister for %s, %s/%s", groupResource, namespace, name)
}
u, ok := clusterObj.(*unstructured.Unstructured)
if !ok {
ctx.log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry returned from client")
return nil, fmt.Errorf("expected *unstructured.Unstructured but got %T", u)
}
return u, nil
}
func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (results.Result, results.Result, bool) { func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupResource schema.GroupResource, namespace string) (results.Result, results.Result, bool) {
warnings, errs := results.Result{}, results.Result{} warnings, errs := results.Result{}, results.Result{}
// itemExists bool is used to determine whether to include this item in the "wait for additional items" list // itemExists bool is used to determine whether to include this item in the "wait for additional items" list
@@ -1163,6 +1275,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
ctx.renamedPVs[oldName] = pvName ctx.renamedPVs[oldName] = pvName
obj.SetName(pvName) obj.SetName(pvName)
name = pvName
// Add the original PV name as an annotation. // Add the original PV name as an annotation.
annotations := obj.GetAnnotations() annotations := obj.GetAnnotations()
@@ -1382,27 +1495,44 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
} }
ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name) ctx.log.Infof("Attempting to restore %s: %v", obj.GroupVersionKind().Kind, name)
createdObj, restoreErr := resourceClient.Create(obj)
if restoreErr == nil { // check if we want to treat the error as a warning, in some cases the creation call might not get executed due to object API validations
itemExists = true // and Velero might not get the already exists error type but in reality the object already exists
ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: itemExists} var fromCluster, createdObj *unstructured.Unstructured
var restoreErr error
// only attempt Get before Create if using informer cache, otherwise this will slow down restore into
// new namespace
if !ctx.disableInformerCache {
ctx.log.Debugf("Checking for existence %s: %v", obj.GroupVersionKind().Kind, name)
fromCluster, err = ctx.getResource(groupResource, obj, namespace, name)
} }
if err != nil || fromCluster == nil {
// couldn't find the resource, attempt to create
ctx.log.Debugf("Creating %s: %v", obj.GroupVersionKind().Kind, name)
createdObj, restoreErr = resourceClient.Create(obj)
if restoreErr == nil {
itemExists = true
ctx.restoredItems[itemKey] = restoredItemStatus{action: itemRestoreResultCreated, itemExists: itemExists}
}
}
isAlreadyExistsError, err := isAlreadyExistsError(ctx, obj, restoreErr, resourceClient) isAlreadyExistsError, err := isAlreadyExistsError(ctx, obj, restoreErr, resourceClient)
if err != nil { if err != nil {
errs.Add(namespace, err) errs.Add(namespace, err)
return warnings, errs, itemExists return warnings, errs, itemExists
} }
// check if we want to treat the error as a warning, in some cases the creation call might not get executed due to object API validations
// and Velero might not get the already exists error type but in reality the object already exists
var fromCluster *unstructured.Unstructured
if restoreErr != nil { if restoreErr != nil {
// check for the existence of the object in cluster, if no error then it implies that object exists // check for the existence of the object in cluster, if no error then it implies that object exists
// and if err then we want to judge whether there is an existing error in the previous creation. // and if err then we want to judge whether there is an existing error in the previous creation.
// if so, we will return the 'get' error. // if so, we will return the 'get' error.
// otherwise, we will return the original creation error. // otherwise, we will return the original creation error.
fromCluster, err = resourceClient.Get(name, metav1.GetOptions{}) if !ctx.disableInformerCache {
fromCluster, err = ctx.getResource(groupResource, obj, namespace, name)
} else {
fromCluster, err = resourceClient.Get(name, metav1.GetOptions{})
}
if err != nil && isAlreadyExistsError { if err != nil && isAlreadyExistsError {
ctx.log.Errorf("Error retrieving in-cluster version of %s: %v", kube.NamespaceAndName(obj), err) ctx.log.Errorf("Error retrieving in-cluster version of %s: %v", kube.NamespaceAndName(obj), err)
errs.Add(namespace, err) errs.Add(namespace, err)
@@ -1947,6 +2077,7 @@ type restoreableItem struct {
path string path string
targetNamespace string targetNamespace string
name string name string
version string // used for initializing informer cache
} }
// getOrderedResourceCollection iterates over list of ordered resource // getOrderedResourceCollection iterates over list of ordered resource
@@ -2136,6 +2267,7 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace
path: itemPath, path: itemPath,
name: item, name: item,
targetNamespace: targetNamespace, targetNamespace: targetNamespace,
version: obj.GroupVersionKind().Version,
} }
restorable.selectedItemsByNamespace[originalNamespace] = restorable.selectedItemsByNamespace[originalNamespace] =
append(restorable.selectedItemsByNamespace[originalNamespace], selectedItem) append(restorable.selectedItemsByNamespace[originalNamespace], selectedItem)

View File

@@ -861,6 +861,7 @@ func TestRestoreItems(t *testing.T) {
tarball io.Reader tarball io.Reader
want []*test.APIResource want []*test.APIResource
expectedRestoreItems map[itemKey]restoredItemStatus expectedRestoreItems map[itemKey]restoredItemStatus
disableInformer bool
}{ }{
{ {
name: "metadata uid/resourceVersion/etc. gets removed", name: "metadata uid/resourceVersion/etc. gets removed",
@@ -1017,6 +1018,26 @@ func TestRestoreItems(t *testing.T) {
apiResources: []*test.APIResource{ apiResources: []*test.APIResource{
test.Secrets(builder.ForSecret("ns-1", "sa-1").Data(map[string][]byte{"foo": []byte("bar")}).Result()), test.Secrets(builder.ForSecret("ns-1", "sa-1").Data(map[string][]byte{"foo": []byte("bar")}).Result()),
}, },
disableInformer: true,
want: []*test.APIResource{
test.Secrets(builder.ForSecret("ns-1", "sa-1").ObjectMeta(builder.WithLabels("velero.io/backup-name", "backup-1", "velero.io/restore-name", "restore-1")).Data(map[string][]byte{"key-1": []byte("value-1")}).Result()),
},
expectedRestoreItems: map[itemKey]restoredItemStatus{
{resource: "v1/Namespace", namespace: "", name: "ns-1"}: {action: "created", itemExists: true},
{resource: "v1/Secret", namespace: "ns-1", name: "sa-1"}: {action: "updated", itemExists: true},
},
},
{
name: "update secret data and labels when secret exists in cluster and is not identical to the backed up one, existing resource policy is update, using informer cache",
restore: defaultRestore().ExistingResourcePolicy("update").Result(),
backup: defaultBackup().Result(),
tarball: test.NewTarWriter(t).
AddItems("secrets", builder.ForSecret("ns-1", "sa-1").Data(map[string][]byte{"key-1": []byte("value-1")}).Result()).
Done(),
apiResources: []*test.APIResource{
test.Secrets(builder.ForSecret("ns-1", "sa-1").Data(map[string][]byte{"foo": []byte("bar")}).Result()),
},
disableInformer: false,
want: []*test.APIResource{ want: []*test.APIResource{
test.Secrets(builder.ForSecret("ns-1", "sa-1").ObjectMeta(builder.WithLabels("velero.io/backup-name", "backup-1", "velero.io/restore-name", "restore-1")).Data(map[string][]byte{"key-1": []byte("value-1")}).Result()), test.Secrets(builder.ForSecret("ns-1", "sa-1").ObjectMeta(builder.WithLabels("velero.io/backup-name", "backup-1", "velero.io/restore-name", "restore-1")).Data(map[string][]byte{"key-1": []byte("value-1")}).Result()),
}, },
@@ -1175,13 +1196,14 @@ func TestRestoreItems(t *testing.T) {
} }
data := &Request{ data := &Request{
Log: h.log, Log: h.log,
Restore: tc.restore, Restore: tc.restore,
Backup: tc.backup, Backup: tc.backup,
PodVolumeBackups: nil, PodVolumeBackups: nil,
VolumeSnapshots: nil, VolumeSnapshots: nil,
BackupReader: tc.tarball, BackupReader: tc.tarball,
RestoredItems: map[itemKey]restoredItemStatus{}, RestoredItems: map[itemKey]restoredItemStatus{},
DisableInformerCache: tc.disableInformer,
} }
warnings, errs := h.restorer.Restore( warnings, errs := h.restorer.Restore(
data, data,

View File

@@ -59,6 +59,7 @@ func NewAPIServer(t *testing.T) *APIServer {
{Group: "velero.io", Version: "v1", Resource: "backups"}: "BackupList", {Group: "velero.io", Version: "v1", Resource: "backups"}: "BackupList",
{Group: "extensions", Version: "v1", Resource: "deployments"}: "ExtDeploymentsList", {Group: "extensions", Version: "v1", Resource: "deployments"}: "ExtDeploymentsList",
{Group: "velero.io", Version: "v1", Resource: "deployments"}: "VeleroDeploymentsList", {Group: "velero.io", Version: "v1", Resource: "deployments"}: "VeleroDeploymentsList",
{Group: "velero.io", Version: "v2alpha1", Resource: "datauploads"}: "DataUploadsList",
}) })
discoveryClient = &DiscoveryClient{FakeDiscovery: kubeClient.Discovery().(*discoveryfake.FakeDiscovery)} discoveryClient = &DiscoveryClient{FakeDiscovery: kubeClient.Discovery().(*discoveryfake.FakeDiscovery)}
) )

View File

@@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic/dynamicinformer"
"github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/client"
) )
@@ -37,6 +38,11 @@ func (df *FakeDynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersi
return args.Get(0).(client.Dynamic), args.Error(1) return args.Get(0).(client.Dynamic), args.Error(1)
} }
func (df *FakeDynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory {
args := df.Called(namespace)
return args.Get(0).(dynamicinformer.DynamicSharedInformerFactory)
}
type FakeDynamicClient struct { type FakeDynamicClient struct {
mock.Mock mock.Mock
} }