Merge pull request #7100 from blackpiglet/6595_volumeinfo_generate

6595 volumeinfo generate
This commit is contained in:
Wenkai Yin(尹文开)
2023-11-22 11:14:36 +08:00
committed by GitHub
23 changed files with 1361 additions and 182 deletions

View File

@@ -23,21 +23,16 @@ import (
"net/http"
"net/http/pprof"
"os"
"reflect"
"strings"
"time"
logrusr "github.com/bombsimon/logrusr/v3"
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@@ -244,15 +239,17 @@ func NewCommand(f client.Factory) *cobra.Command {
}
type server struct {
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
discoveryClient discovery.DiscoveryInterface
discoveryHelper velerodiscovery.Helper
dynamicClient dynamic.Interface
csiSnapshotClient *snapshotv1client.Clientset
csiSnapshotLister snapshotv1listers.VolumeSnapshotLister
namespace string
metricsAddress string
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
discoveryClient discovery.DiscoveryInterface
discoveryHelper velerodiscovery.Helper
dynamicClient dynamic.Interface
// controller-runtime client. the difference from the controller-manager's client
// is that the the controller-manager's client is limited to list namespaced-scoped
// resources in the namespace where Velero is installed, or the cluster-scoped
// resources. The crClient doesn't have the limitation.
crClient ctrlclient.Client
ctx context.Context
cancelFunc context.CancelFunc
@@ -399,23 +396,6 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
featureVerifier: featureVerifier,
}
// Setup CSI snapshot client and lister
var csiSnapClient *snapshotv1client.Clientset
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
csiSnapClient, err = snapshotv1client.NewForConfig(clientConfig)
if err != nil {
cancelFunc()
return nil, err
}
s.csiSnapshotClient = csiSnapClient
s.csiSnapshotLister, err = s.getCSIVolumeSnapshotListers()
if err != nil {
cancelFunc()
return nil, err
}
}
return s, nil
}
@@ -615,40 +595,6 @@ func (s *server) initRepoManager() error {
return nil
}
func (s *server) getCSIVolumeSnapshotListers() (vsLister snapshotv1listers.VolumeSnapshotLister, err error) {
_, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String())
switch {
case apierrors.IsNotFound(err):
// CSI is enabled, but the required CRDs aren't installed, so halt.
s.logger.Warnf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String())
err = nil
case err == nil:
wrapper := NewCSIInformerFactoryWrapper(s.csiSnapshotClient)
s.logger.Debug("Creating CSI listers")
// Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe.
vsLister = wrapper.factory.Snapshot().V1().VolumeSnapshots().Lister()
// start the informers & and wait for the caches to sync
wrapper.Start(s.ctx.Done())
s.logger.Info("Waiting for informer caches to sync")
csiCacheSyncResults := wrapper.WaitForCacheSync(s.ctx.Done())
s.logger.Info("Done waiting for informer caches to sync")
for informer, synced := range csiCacheSyncResults {
if !synced {
err = errors.Errorf("cache was not synced for informer %v", informer)
return
}
s.logger.WithField("informer", informer).Info("Informer cache synced")
}
case err != nil:
s.logger.Errorf("fail to find snapshot v1 schema: %s", err)
}
return vsLister, err
}
func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string) error {
s.logger.Info("Starting controllers")
@@ -775,10 +721,10 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.metrics,
backupStoreGetter,
s.config.formatFlag.Parse(),
s.csiSnapshotLister,
s.credentialFileStore,
s.config.maxConcurrentK8SConnections,
s.config.defaultSnapshotMoveData,
s.crClient,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Backup)
}
@@ -837,7 +783,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
cmd.CheckError(err)
r := controller.NewBackupFinalizerReconciler(
s.mgr.GetClient(),
s.csiSnapshotLister,
s.crClient,
clock.RealClock{},
backupper,
newPluginManager,
@@ -1027,37 +973,6 @@ func (s *server) runProfiler() {
}
}
// CSIInformerFactoryWrapper is a proxy around the CSI SharedInformerFactory that checks the CSI feature flag before performing operations.
type CSIInformerFactoryWrapper struct {
factory snapshotv1informers.SharedInformerFactory
}
func NewCSIInformerFactoryWrapper(c snapshotv1client.Interface) *CSIInformerFactoryWrapper {
// If no namespace is specified, all namespaces are watched.
// This is desirable for VolumeSnapshots, as we want to query for all VolumeSnapshots across all namespaces using this informer
w := &CSIInformerFactoryWrapper{}
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
w.factory = snapshotv1informers.NewSharedInformerFactoryWithOptions(c, 0)
}
return w
}
// Start proxies the Start call to the CSI SharedInformerFactory.
func (w *CSIInformerFactoryWrapper) Start(stopCh <-chan struct{}) {
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
w.factory.Start(stopCh)
}
}
// WaitForCacheSync proxies the WaitForCacheSync call to the CSI SharedInformerFactory.
func (w *CSIInformerFactoryWrapper) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
return w.factory.WaitForCacheSync(stopCh)
}
return nil
}
// if there is a restarting during the reconciling of backups/restores/etc, these CRs may be stuck in progress status
// markInProgressCRsFailed tries to mark the in progress CRs as failed when starting the server to avoid the issue
func markInProgressCRsFailed(ctx context.Context, cfg *rest.Config, scheme *runtime.Scheme, namespace string, log logrus.FieldLogger) {