Modify namespace filter logic for backup with label selector.

Signed-off-by: Xun Jiang <blackpigletbruce@gmail.com>
This commit is contained in:
Xun Jiang
2024-04-17 18:52:11 +08:00
parent f1f0c8e5a7
commit 2eeaf4d55e
5 changed files with 502 additions and 105 deletions

View File

@@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/pager"
@@ -50,6 +51,107 @@ type itemCollector struct {
cohabitatingResources map[string]*cohabitatingResource
dir string
pageSize int
nsTracker nsTracker
}
// nsTracker is used to integrate several namespace filters together.
// 1. backup's namespace include and exclude filters;
// 2. backup's LabelSelector and OrLabelSelector selected namespace;
// 3. resources namespaces selected by label selectors.
type nsTracker struct {
singleLabelSelector labels.Selector
orLabelSelector []labels.Selector
namespaceFilter *collections.IncludesExcludes
logger logrus.FieldLogger
namespaceMap map[string]bool
}
func (nt *nsTracker) track(ns string) {
if nt.namespaceMap == nil {
nt.namespaceMap = make(map[string]bool)
}
if _, ok := nt.namespaceMap[ns]; !ok {
nt.namespaceMap[ns] = true
}
}
func (nt *nsTracker) isTracked(ns string) bool {
if nt.namespaceMap != nil {
return nt.namespaceMap[ns]
}
return false
}
func (nt *nsTracker) init(
unstructuredNSs []unstructured.Unstructured,
singleLabelSelector labels.Selector,
orLabelSelector []labels.Selector,
namespaceFilter *collections.IncludesExcludes,
logger logrus.FieldLogger,
) {
nt.namespaceMap = make(map[string]bool)
nt.singleLabelSelector = singleLabelSelector
nt.orLabelSelector = orLabelSelector
nt.namespaceFilter = namespaceFilter
nt.logger = logger
for _, namespace := range unstructuredNSs {
if nt.singleLabelSelector != nil &&
nt.singleLabelSelector.Matches(labels.Set(namespace.GetLabels())) == true {
nt.logger.Debugf("Track namespace %s,", namespace.GetName(),
"because its labels match backup LabelSelector.")
nt.track(namespace.GetName())
continue
}
if len(nt.orLabelSelector) > 0 {
for _, selector := range nt.orLabelSelector {
if selector.Matches(labels.Set(namespace.GetLabels())) == true {
nt.logger.Debugf("Track namespace %s", namespace.GetName(),
"because its labels match the backup OrLabelSelector.")
nt.track(namespace.GetName())
continue
}
}
}
// Skip the backup when the backup's namespace filter has
// default value(*), and the namespace doesn't match backup
// LabelSelector and OrLabelSelector.
// https://github.com/vmware-tanzu/velero/issues/7105
if nt.namespaceFilter.ShouldInclude("*") == true &&
(nt.singleLabelSelector != nil || len(nt.orLabelSelector) > 0) {
nt.logger.Debugf("Skip namespace %s,", namespace.GetName(),
"because backup's namespace filter is *, and backup has label selector.")
continue
}
if nt.namespaceFilter.ShouldInclude(namespace.GetName()) == true {
nt.logger.Debugf("Track namespace %s", namespace.GetName(),
"because its name match the backup namespace filter.")
nt.track(namespace.GetName())
}
}
}
// filterNamespaces only keeps namespace tracked by the nsTracker
func (nt *nsTracker) filterNamespaces(
resources []*kubernetesResource,
) []*kubernetesResource {
result := make([]*kubernetesResource, 0)
for _, resource := range resources {
if resource.groupResource != kuberesource.Namespaces ||
nt.isTracked(resource.name) == true {
result = append(result, resource)
}
}
return result
}
type kubernetesResource struct {
@@ -60,17 +162,22 @@ type kubernetesResource struct {
// getItemsFromResourceIdentifiers converts ResourceIdentifiers to
// kubernetesResources
func (r *itemCollector) getItemsFromResourceIdentifiers(resourceIDs []velero.ResourceIdentifier) []*kubernetesResource {
func (r *itemCollector) getItemsFromResourceIdentifiers(
resourceIDs []velero.ResourceIdentifier,
) []*kubernetesResource {
grResourceIDsMap := make(map[schema.GroupResource][]velero.ResourceIdentifier)
for _, resourceID := range resourceIDs {
grResourceIDsMap[resourceID.GroupResource] = append(grResourceIDsMap[resourceID.GroupResource], resourceID)
grResourceIDsMap[resourceID.GroupResource] = append(
grResourceIDsMap[resourceID.GroupResource], resourceID)
}
return r.getItems(grResourceIDsMap)
}
// getAllItems gets all relevant items from all API groups.
func (r *itemCollector) getAllItems() []*kubernetesResource {
return r.getItems(nil)
resources := r.getItems(nil)
return r.nsTracker.filterNamespaces(resources)
}
// getItems gets all relevant items from all API groups.
@@ -81,12 +188,15 @@ func (r *itemCollector) getAllItems() []*kubernetesResource {
// this case, include/exclude rules are not invoked, since we already
// have the list of items, we just need the item collector/discovery
// helper to fill in the missing GVR, etc. context.
func (r *itemCollector) getItems(resourceIDsMap map[schema.GroupResource][]velero.ResourceIdentifier) []*kubernetesResource {
func (r *itemCollector) getItems(
resourceIDsMap map[schema.GroupResource][]velero.ResourceIdentifier,
) []*kubernetesResource {
var resources []*kubernetesResource
for _, group := range r.discoveryHelper.Resources() {
groupItems, err := r.getGroupItems(r.log, group, resourceIDsMap)
if err != nil {
r.log.WithError(err).WithField("apiGroup", group.String()).Error("Error collecting resources from API group")
r.log.WithError(err).WithField("apiGroup", group.String()).
Error("Error collecting resources from API group")
continue
}
@@ -99,7 +209,11 @@ func (r *itemCollector) getItems(resourceIDsMap map[schema.GroupResource][]veler
// getGroupItems collects all relevant items from a single API group.
// If resourceIDsMap is supplied, then only those items are returned,
// with GVR/APIResource metadata supplied.
func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIResourceList, resourceIDsMap map[schema.GroupResource][]velero.ResourceIdentifier) ([]*kubernetesResource, error) {
func (r *itemCollector) getGroupItems(
log logrus.FieldLogger,
group *metav1.APIResourceList,
resourceIDsMap map[schema.GroupResource][]velero.ResourceIdentifier,
) ([]*kubernetesResource, error) {
log = log.WithField("group", group.GroupVersion)
log.Infof("Getting items for group")
@@ -107,11 +221,12 @@ func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIR
// Parse so we can check if this is the core group
gv, err := schema.ParseGroupVersion(group.GroupVersion)
if err != nil {
return nil, errors.Wrapf(err, "error parsing GroupVersion %q", group.GroupVersion)
return nil, errors.Wrapf(err, "error parsing GroupVersion %q",
group.GroupVersion)
}
if gv.Group == "" {
// This is the core group, so make sure we process in the following order: pods, pvcs, pvs,
// everything else.
// This is the core group, so make sure we process in the following order:
// pods, pvcs, pvs, everything else.
sortCoreGroup(group)
}
@@ -119,7 +234,8 @@ func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIR
for _, resource := range group.APIResources {
resourceItems, err := r.getResourceItems(log, gv, resource, resourceIDsMap)
if err != nil {
log.WithError(err).WithField("resource", resource.String()).Error("Error getting items for resource")
log.WithError(err).WithField("resource", resource.String()).
Error("Error getting items for resource")
continue
}
@@ -129,8 +245,13 @@ func (r *itemCollector) getGroupItems(log logrus.FieldLogger, group *metav1.APIR
return items, nil
}
// sortResourcesByOrder sorts items by the names specified in "order". Items are not in order will be put at the end in original order.
func sortResourcesByOrder(log logrus.FieldLogger, items []*kubernetesResource, order []string) []*kubernetesResource {
// sortResourcesByOrder sorts items by the names specified in "order".
// Items are not in order will be put at the end in original order.
func sortResourcesByOrder(
log logrus.FieldLogger,
items []*kubernetesResource,
order []string,
) []*kubernetesResource {
if len(order) == 0 {
return items
}
@@ -175,7 +296,10 @@ func sortResourcesByOrder(log logrus.FieldLogger, items []*kubernetesResource, o
}
// getOrderedResourcesForType gets order of resourceType from orderResources.
func getOrderedResourcesForType(orderedResources map[string]string, resourceType string) []string {
func getOrderedResourcesForType(
orderedResources map[string]string,
resourceType string,
) []string {
if orderedResources == nil {
return nil
}
@@ -190,7 +314,12 @@ func getOrderedResourcesForType(orderedResources map[string]string, resourceType
// getResourceItems collects all relevant items for a given group-version-resource.
// If resourceIDsMap is supplied, the items will be pulled from here
// rather than from the cluster and applying include/exclude rules.
func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.GroupVersion, resource metav1.APIResource, resourceIDsMap map[schema.GroupResource][]velero.ResourceIdentifier) ([]*kubernetesResource, error) {
func (r *itemCollector) getResourceItems(
log logrus.FieldLogger,
gv schema.GroupVersion,
resource metav1.APIResource,
resourceIDsMap map[schema.GroupResource][]velero.ResourceIdentifier,
) ([]*kubernetesResource, error) {
log = log.WithField("resource", resource.Name)
log.Info("Getting items for resource")
@@ -200,7 +329,10 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
gr = gvr.GroupResource()
)
orders := getOrderedResourcesForType(r.backupRequest.Backup.Spec.OrderedResources, resource.Name)
orders := getOrderedResourcesForType(
r.backupRequest.Backup.Spec.OrderedResources,
resource.Name,
)
// Getting the preferred group version of this resource
preferredGVR, _, err := r.discoveryHelper.ResourceFor(gr.WithVersion(""))
if err != nil {
@@ -222,7 +354,11 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
"name": resourceID.Name,
},
).Infof("Getting item")
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, resourceID.Namespace)
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(
gv,
resource,
resourceID.Namespace,
)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error getting client for resource")
continue
@@ -257,7 +393,8 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
}
if cohabitator, found := r.cohabitatingResources[resource.Name]; found {
if gv.Group == cohabitator.groupResource1.Group || gv.Group == cohabitator.groupResource2.Group {
if gv.Group == cohabitator.groupResource1.Group ||
gv.Group == cohabitator.groupResource2.Group {
if cohabitator.seen {
log.WithFields(
logrus.Fields{
@@ -275,20 +412,13 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
// Namespace are only filtered by namespace include/exclude filters.
// Label selectors are not checked.
if gr == kuberesource.Namespaces {
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, "")
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
return nil, errors.WithStack(err)
}
unstructuredList, err := resourceClient.List(metav1.ListOptions{})
if err != nil {
log.WithError(errors.WithStack(err)).Error("error list namespaces")
return nil, errors.WithStack(err)
}
items := r.backupNamespaces(unstructuredList, r.backupRequest.NamespaceIncludesExcludes, gr, preferredGVR, log)
return items, nil
return r.backupNamespaces(
resource,
gv,
gr,
preferredGVR,
log,
)
}
clusterScoped := !resource.Namespaced
@@ -302,57 +432,12 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
var items []*kubernetesResource
for _, namespace := range namespacesToList {
// List items from Kubernetes API
log = log.WithField("namespace", namespace)
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, namespace)
unstructuredItems, err := r.listResourceByLabelsPerNamespace(
namespace, gr, gv, resource, log)
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
continue
}
var orLabelSelectors []string
if r.backupRequest.Spec.OrLabelSelectors != nil {
for _, s := range r.backupRequest.Spec.OrLabelSelectors {
orLabelSelectors = append(orLabelSelectors, metav1.FormatLabelSelector(s))
}
} else {
orLabelSelectors = []string{}
}
log.Info("Listing items")
unstructuredItems := make([]unstructured.Unstructured, 0)
// Listing items for orLabelSelectors
errListingForNS := false
for _, label := range orLabelSelectors {
unstructuredItems, err = r.listItemsForLabel(unstructuredItems, gr, label, resourceClient)
if err != nil {
errListingForNS = true
}
}
if errListingForNS {
log.WithError(err).Error("Error listing items")
continue
}
var labelSelector string
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
// Listing items for labelSelector (singular)
if len(orLabelSelectors) == 0 {
unstructuredItems, err = r.listItemsForLabel(unstructuredItems, gr, labelSelector, resourceClient)
if err != nil {
log.WithError(err).Error("Error listing items")
continue
}
}
log.Infof("Retrieved %d items", len(unstructuredItems))
// Collect items in included Namespaces
for i := range unstructuredItems {
item := &unstructuredItems[i]
@@ -370,8 +455,14 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
name: item.GetName(),
path: path,
})
if item.GetNamespace() != "" {
log.Debugf("Track namespace %s in nsTracker", item.GetNamespace())
r.nsTracker.track(item.GetNamespace())
}
}
}
if len(orders) > 0 {
items = sortResourcesByOrder(r.log, items, orders)
}
@@ -379,6 +470,71 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
return items, nil
}
func (r *itemCollector) listResourceByLabelsPerNamespace(
namespace string,
gr schema.GroupResource,
gv schema.GroupVersion,
resource metav1.APIResource,
logger logrus.FieldLogger,
) ([]unstructured.Unstructured, error) {
// List items from Kubernetes API
logger = logger.WithField("namespace", namespace)
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, namespace)
if err != nil {
logger.WithError(err).Error("Error getting dynamic client")
return nil, err
}
var orLabelSelectors []string
if r.backupRequest.Spec.OrLabelSelectors != nil {
for _, s := range r.backupRequest.Spec.OrLabelSelectors {
orLabelSelectors = append(orLabelSelectors, metav1.FormatLabelSelector(s))
}
} else {
orLabelSelectors = []string{}
}
logger.Info("Listing items")
unstructuredItems := make([]unstructured.Unstructured, 0)
// Listing items for orLabelSelectors
errListingForNS := false
for _, label := range orLabelSelectors {
unstructuredItems, err = r.listItemsForLabel(unstructuredItems, gr, label, resourceClient)
if err != nil {
errListingForNS = true
}
}
if errListingForNS {
logger.WithError(err).Error("Error listing items")
return nil, err
}
var labelSelector string
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
// Listing items for labelSelector (singular)
if len(orLabelSelectors) == 0 {
unstructuredItems, err = r.listItemsForLabel(
unstructuredItems,
gr,
labelSelector,
resourceClient,
)
if err != nil {
logger.WithError(err).Error("Error listing items")
return nil, err
}
}
logger.Infof("Retrieved %d items", len(unstructuredItems))
return unstructuredItems, nil
}
func (r *itemCollector) writeToFile(item *unstructured.Unstructured) (string, error) {
f, err := os.CreateTemp(r.dir, "")
if err != nil {
@@ -475,7 +631,11 @@ func newCohabitatingResource(resource, group1, group2 string) *cohabitatingResou
}
// function to process pager client calls when the pageSize is specified
func (r *itemCollector) processPagerClientCalls(gr schema.GroupResource, label string, resourceClient client.Dynamic) (runtime.Object, error) {
func (r *itemCollector) processPagerClientCalls(
gr schema.GroupResource,
label string,
resourceClient client.Dynamic,
) (runtime.Object, error) {
// If limit is positive, use a pager to split list over multiple requests
// Use Velero's dynamic list function instead of the default
listPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
@@ -499,7 +659,12 @@ func (r *itemCollector) processPagerClientCalls(gr schema.GroupResource, label s
return list, nil
}
func (r *itemCollector) listItemsForLabel(unstructuredItems []unstructured.Unstructured, gr schema.GroupResource, label string, resourceClient client.Dynamic) ([]unstructured.Unstructured, error) {
func (r *itemCollector) listItemsForLabel(
unstructuredItems []unstructured.Unstructured,
gr schema.GroupResource,
label string,
resourceClient client.Dynamic,
) ([]unstructured.Unstructured, error) {
if r.pageSize > 0 {
// process pager client calls
list, err := r.processPagerClientCalls(gr, label, resourceClient)
@@ -510,7 +675,8 @@ func (r *itemCollector) listItemsForLabel(unstructuredItems []unstructured.Unstr
err = meta.EachListItem(list, func(object runtime.Object) error {
u, ok := object.(*unstructured.Unstructured)
if !ok {
r.log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry in the list")
r.log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).
Error("unable to understand entry in the list")
return fmt.Errorf("expected *unstructured.Unstructured but got %T", u)
}
unstructuredItems = append(unstructuredItems, *u)
@@ -532,28 +698,72 @@ func (r *itemCollector) listItemsForLabel(unstructuredItems []unstructured.Unstr
}
// backupNamespaces process namespace resource according to namespace filters.
func (r *itemCollector) backupNamespaces(unstructuredList *unstructured.UnstructuredList,
ie *collections.IncludesExcludes, gr schema.GroupResource, preferredGVR schema.GroupVersionResource,
log logrus.FieldLogger) []*kubernetesResource {
var items []*kubernetesResource
for index, unstructured := range unstructuredList.Items {
if ie.ShouldInclude(unstructured.GetName()) {
log.Debugf("Backup namespace %s due to namespace filters setting.", unstructured.GetName())
func (r *itemCollector) backupNamespaces(
resource metav1.APIResource,
gv schema.GroupVersion,
gr schema.GroupResource,
preferredGVR schema.GroupVersionResource,
log logrus.FieldLogger,
) ([]*kubernetesResource, error) {
resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, "")
if err != nil {
log.WithError(err).Error("Error getting dynamic client")
return nil, errors.WithStack(err)
}
path, err := r.writeToFile(&unstructuredList.Items[index])
unstructuredList, err := resourceClient.List(metav1.ListOptions{})
if err != nil {
log.WithError(errors.WithStack(err)).Error("error list namespaces")
return nil, errors.WithStack(err)
}
var singleSelector labels.Selector
var orSelectors []labels.Selector
if r.backupRequest.Backup.Spec.LabelSelector != nil {
var err error
singleSelector, err = metav1.LabelSelectorAsSelector(
r.backupRequest.Backup.Spec.LabelSelector)
if err != nil {
log.WithError(err).Errorf("Fail to convert backup LabelSelector %s into selector.",
metav1.FormatLabelSelector(r.backupRequest.Backup.Spec.LabelSelector))
}
}
if r.backupRequest.Backup.Spec.OrLabelSelectors != nil {
for _, ls := range r.backupRequest.Backup.Spec.OrLabelSelectors {
orSelector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
log.WithError(err).Error("Error writing item to file")
continue
log.WithError(err).Errorf("Fail to convert backup OrLabelSelector %s into selector.",
metav1.FormatLabelSelector(ls))
}
items = append(items, &kubernetesResource{
groupResource: gr,
preferredGVR: preferredGVR,
name: unstructured.GetName(),
path: path,
})
orSelectors = append(orSelectors, orSelector)
}
}
return items
r.nsTracker.init(
unstructuredList.Items,
singleSelector,
orSelectors,
r.backupRequest.NamespaceIncludesExcludes,
log,
)
var items []*kubernetesResource
for index, unstructured := range unstructuredList.Items {
path, err := r.writeToFile(&unstructuredList.Items[index])
if err != nil {
log.WithError(err).Error("Error writing item to file")
continue
}
items = append(items, &kubernetesResource{
groupResource: gr,
preferredGVR: preferredGVR,
name: unstructured.GetName(),
path: path,
})
}
return items, nil
}