Merge pull request #4306 from alaypatel07/fix-paging

fix buggy pager func
This commit is contained in:
Wenkai Yin(尹文开)
2021-11-10 07:53:58 +08:00
committed by GitHub
2 changed files with 25 additions and 33 deletions

View File

@@ -0,0 +1 @@
fix buggy pager func

View File

@@ -26,7 +26,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
@@ -295,7 +295,6 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
listOptions := metav1.ListOptions{LabelSelector: labelSelector}
log.Info("Listing items")
unstructuredItems := make([]unstructured.Unstructured, 0)
@@ -303,50 +302,42 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
if r.pageSize > 0 {
// If limit is positive, use a pager to split list over multiple requests
// Use Velero's dynamic list function instead of the default
listFunc := pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
list, err := resourceClient.List(listOptions)
if err != nil {
return nil, err
}
return list, nil
})
listPager := pager.New(listFunc)
listPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return resourceClient.List(opts)
}))
// Use the page size defined in the server config
// TODO allow configuration of page buffer size
listPager.PageSize = int64(r.pageSize)
// Add each item to temporary slice
var items []unstructured.Unstructured
err := listPager.EachListItem(context.Background(), listOptions, func(object runtime.Object) error {
item, isUnstructured := object.(*unstructured.Unstructured)
if !isUnstructured {
// We should never hit this
log.Error("Got type other than Unstructured from pager func")
return nil
}
items = append(items, *item)
return nil
})
if statusError, isStatusError := err.(*apierrors.StatusError); isStatusError && statusError.Status().Reason == metav1.StatusReasonExpired {
log.WithError(errors.WithStack(err)).Error("Error paging item list. Falling back on unpaginated list")
unstructuredList, err := resourceClient.List(listOptions)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
items = unstructuredList.Items
} else if err != nil {
log.WithError(errors.WithStack(err)).Error("Error paging item list")
list, paginated, err := listPager.List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing resources")
continue
}
if !paginated {
log.Infof("list for groupResource %s was not paginated", gr)
}
err = meta.EachListItem(list, func(object runtime.Object) error {
u, ok := object.(*unstructured.Unstructured)
if !ok {
log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry in the list")
return fmt.Errorf("expected *unstructured.Unstructured but got %T", u)
}
unstructuredItems = append(unstructuredItems, *u)
return nil
})
if err != nil {
log.WithError(errors.WithStack(err)).Error("unable to understand paginated list")
continue
}
unstructuredItems = append(unstructuredItems, items...)
} else {
// If limit is not positive, do not use paging. Instead, request all items at once
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
unstructuredItems = append(unstructuredItems, unstructuredList.Items...)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
unstructuredItems = append(unstructuredItems, unstructuredList.Items...)
}
log.Infof("Retrieved %d items", len(unstructuredItems))