diff --git a/internal/controller/prepare_controllers.go b/internal/controller/prepare_controllers.go new file mode 100644 index 000000000..96d41ac59 --- /dev/null +++ b/internal/controller/prepare_controllers.go @@ -0,0 +1,139 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + k8sinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + + "github.com/suzerain-io/controller-go" + "github.com/suzerain-io/placeholder-name/internal/autoregistration" + "github.com/suzerain-io/placeholder-name/internal/controller/logindiscovery" + placeholderclientset "github.com/suzerain-io/placeholder-name/kubernetes/1.19/client-go/clientset/versioned" + placeholderinformers "github.com/suzerain-io/placeholder-name/kubernetes/1.19/client-go/informers/externalversions" +) + +const ( + singletonWorker = 1 + defaultResyncInterval = 3 * time.Minute +) + +// Prepare the controllers and their informers and return a function that will start them when called. +func PrepareControllers( + ctx context.Context, + caBundle []byte, + serverInstallationNamespace string, + discoveryURLOverride *string, +) (func(ctx context.Context), error) { + // Create k8s clients. + k8sClient, aggregationClient, placeholderClient, err := createClients() + if err != nil { + return nil, fmt.Errorf("could not create clients for the controllers: %w", err) + } + + // TODO Putting this here temporarily on the way toward moving it elsewhere. + // When it moves elsewhere then PrepareControllers() will not need to take ctx and caBundle parameters. + if err := autoregistration.UpdateAPIService(ctx, aggregationClient, caBundle); err != nil { + return nil, fmt.Errorf("could not update the API service: %w", err) + } + + // Create informers. + k8sInformers, placeholderInformers := createInformers(serverInstallationNamespace, k8sClient, placeholderClient) + + // Create controller manager. + controllerManager := controller. + NewManager(). + WithController( + logindiscovery.NewPublisherController( + serverInstallationNamespace, + discoveryURLOverride, + placeholderClient, + k8sInformers.Core().V1().ConfigMaps(), + placeholderInformers.Crds().V1alpha1().LoginDiscoveryConfigs(), + controller.WithInformer, + ), + singletonWorker, + ) + + // Return a function which starts the informers and controllers. + return func(ctx context.Context) { + k8sInformers.Start(ctx.Done()) + placeholderInformers.Start(ctx.Done()) + go controllerManager.Start(ctx) + }, nil +} + +// Create the k8s clients that will be used by the controllers. +func createClients() (*kubernetes.Clientset, *aggregationv1client.Clientset, *placeholderclientset.Clientset, error) { + // Load the Kubernetes client configuration (kubeconfig), + kubeConfig, err := restclient.InClusterConfig() + if err != nil { + return nil, nil, nil, fmt.Errorf("could not load in-cluster configuration: %w", err) + } + + // explicitly use protobuf when talking to built-in kube APIs + protoKubeConfig := createProtoKubeConfig(kubeConfig) + + // Connect to the core Kubernetes API. + k8sClient, err := kubernetes.NewForConfig(protoKubeConfig) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not initialize Kubernetes client: %w", err) + } + + // Connect to the Kubernetes aggregation API. + aggregationClient, err := aggregationv1client.NewForConfig(protoKubeConfig) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not initialize Kubernetes client: %w", err) + } + + // Connect to the placeholder API. + // I think we can't use protobuf encoding here because we are using CRDs + // (for which protobuf encoding is not supported). + placeholderClient, err := placeholderclientset.NewForConfig(kubeConfig) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not initialize placeholder client: %w", err) + } + + return k8sClient, aggregationClient, placeholderClient, nil +} + +// Create the informers that will be used by the controllers. +func createInformers( + serverInstallationNamespace string, + k8sClient *kubernetes.Clientset, + placeholderClient *placeholderclientset.Clientset, +) (k8sinformers.SharedInformerFactory, placeholderinformers.SharedInformerFactory) { + k8sInformers := k8sinformers.NewSharedInformerFactoryWithOptions( + k8sClient, + defaultResyncInterval, + k8sinformers.WithNamespace( + logindiscovery.ClusterInfoNamespace, + ), + ) + placeholderInformers := placeholderinformers.NewSharedInformerFactoryWithOptions( + placeholderClient, + defaultResyncInterval, + placeholderinformers.WithNamespace(serverInstallationNamespace), + ) + return k8sInformers, placeholderInformers +} + +// Returns a copy of the input config with the ContentConfig set to use protobuf. +// Do not use this config to communicate with any CRD based APIs. +func createProtoKubeConfig(kubeConfig *restclient.Config) *restclient.Config { + protoKubeConfig := restclient.CopyConfig(kubeConfig) + const protoThenJSON = runtime.ContentTypeProtobuf + "," + runtime.ContentTypeJSON + protoKubeConfig.AcceptContentTypes = protoThenJSON + protoKubeConfig.ContentType = runtime.ContentTypeProtobuf + return protoKubeConfig +} diff --git a/internal/server/server.go b/internal/server/server.go index c4360763c..6ec828ce9 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -17,44 +17,28 @@ import ( "time" "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/runtime" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/plugin/pkg/authenticator/token/webhook" - k8sinformers "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" - "github.com/suzerain-io/controller-go" "github.com/suzerain-io/placeholder-name/internal/apiserver" - "github.com/suzerain-io/placeholder-name/internal/autoregistration" "github.com/suzerain-io/placeholder-name/internal/certauthority" - "github.com/suzerain-io/placeholder-name/internal/controller/logindiscovery" + "github.com/suzerain-io/placeholder-name/internal/controller" "github.com/suzerain-io/placeholder-name/internal/downward" placeholderv1alpha1 "github.com/suzerain-io/placeholder-name/kubernetes/1.19/api/apis/placeholder/v1alpha1" - placeholderclientset "github.com/suzerain-io/placeholder-name/kubernetes/1.19/client-go/clientset/versioned" - placeholderinformers "github.com/suzerain-io/placeholder-name/kubernetes/1.19/client-go/informers/externalversions" "github.com/suzerain-io/placeholder-name/pkg/config" ) -const ( - singletonWorker = 1 - defaultResyncInterval = 3 * time.Minute -) - // App is an object that represents the placeholder-name-server application. type App struct { - cmd *cobra.Command + serverCommand *cobra.Command // CLI flags configPath string downwardAPIPath string clusterSigningCertFilePath string clusterSigningKeyFilePath string - - recommendedOptions *genericoptions.RecommendedOptions } // This is ignored for now because we turn off etcd storage below, but this is @@ -63,61 +47,39 @@ const defaultEtcdPathPrefix = "/registry/" + placeholderv1alpha1.GroupName // New constructs a new App with command line args, stdout and stderr. func New(ctx context.Context, args []string, stdout, stderr io.Writer) *App { - a := &App{ - recommendedOptions: genericoptions.NewRecommendedOptions( - defaultEtcdPathPrefix, - apiserver.Codecs.LegacyCodec(placeholderv1alpha1.SchemeGroupVersion), - // TODO we should check to see if all the other default settings are acceptable for us - ), - } - a.recommendedOptions.Etcd = nil // turn off etcd storage because we don't need it yet + app := &App{} + app.addServerCommand(ctx, args, stdout, stderr) + return app +} +// Run the server. +func (app *App) Run() error { + return app.serverCommand.Execute() +} + +// Create the server command and save it into the App. +func (app *App) addServerCommand(ctx context.Context, args []string, stdout, stderr io.Writer) { cmd := &cobra.Command{ Use: `placeholder-name-server`, - Long: `placeholder-name-server provides a generic API for mapping an external -credential from somewhere to an internal credential to be used for -authenticating to the Kubernetes API.`, - RunE: func(cmd *cobra.Command, args []string) error { - // Load the Kubernetes client configuration (kubeconfig), - kubeConfig, err := restclient.InClusterConfig() - if err != nil { - return fmt.Errorf("could not load in-cluster configuration: %w", err) - } - - // explicitly use protobuf when talking to built-in kube APIs - protoKubeConfig := createProtoKubeConfig(kubeConfig) - - // Connect to the core Kubernetes API. - k8sClient, err := kubernetes.NewForConfig(protoKubeConfig) - if err != nil { - return fmt.Errorf("could not initialize Kubernetes client: %w", err) - } - - // Connect to the Kubernetes aggregation API. - aggregationClient, err := aggregationv1client.NewForConfig(protoKubeConfig) - if err != nil { - return fmt.Errorf("could not initialize Kubernetes client: %w", err) - } - - // Connect to the placeholder API. - // I think we can't use protobuf encoding here because we are using CRDs - // (for which protobuf encoding is not supported). - placeholderClient, err := placeholderclientset.NewForConfig(kubeConfig) - if err != nil { - return fmt.Errorf("could not initialize placeholder client: %w", err) - } - - return a.run(ctx, k8sClient, aggregationClient, placeholderClient) - }, + Long: "placeholder-name-server provides a generic API for mapping an external\n" + + "credential from somewhere to an internal credential to be used for\n" + + "authenticating to the Kubernetes API.", + RunE: func(cmd *cobra.Command, args []string) error { return app.runServer(ctx) }, Args: cobra.NoArgs, } cmd.SetArgs(args) cmd.SetOut(stdout) cmd.SetErr(stderr) + addCommandlineFlagsToCommand(cmd, app) + app.serverCommand = cmd +} + +// Define the app's commandline flags. +func addCommandlineFlagsToCommand(cmd *cobra.Command, app *App) { cmd.Flags().StringVarP( - &a.configPath, + &app.configPath, "config", "c", "placeholder-name.yaml", @@ -125,73 +87,64 @@ authenticating to the Kubernetes API.`, ) cmd.Flags().StringVar( - &a.downwardAPIPath, + &app.downwardAPIPath, "downward-api-path", "/etc/podinfo", "path to Downward API volume mount", ) cmd.Flags().StringVar( - &a.clusterSigningCertFilePath, + &app.clusterSigningCertFilePath, "cluster-signing-cert-file", "", "path to cluster signing certificate", ) cmd.Flags().StringVar( - &a.clusterSigningKeyFilePath, + &app.clusterSigningKeyFilePath, "cluster-signing-key-file", "", "path to cluster signing private key", ) - - a.cmd = cmd - - return a } -func (a *App) Run() error { - return a.cmd.Execute() -} - -func (a *App) run( - ctx context.Context, - k8sClient kubernetes.Interface, - aggregationClient aggregationv1client.Interface, - placeholderClient placeholderclientset.Interface, -) error { - cfg, err := config.FromPath(a.configPath) +// Boot the aggregated API server, which will in turn boot the controllers. +func (app *App) runServer(ctx context.Context) error { + // Read the server config file. + cfg, err := config.FromPath(app.configPath) if err != nil { return fmt.Errorf("could not load config: %w", err) } // Load the Kubernetes cluster signing CA. - clientCA, err := certauthority.Load(a.clusterSigningCertFilePath, a.clusterSigningKeyFilePath) + k8sClusterCA, err := certauthority.Load(app.clusterSigningCertFilePath, app.clusterSigningKeyFilePath) if err != nil { return fmt.Errorf("could not load cluster signing CA: %w", err) } + // Create a WebhookTokenAuthenticator. webhookTokenAuthenticator, err := config.NewWebhook(cfg.WebhookConfig) if err != nil { return fmt.Errorf("could not create webhook client: %w", err) } - podinfo, err := downward.Load(a.downwardAPIPath) + // Discover in which namespace we are installed. + podInfo, err := downward.Load(app.downwardAPIPath) if err != nil { return fmt.Errorf("could not read pod metadata: %w", err) } - serverInstallationNamespace := podinfo.Namespace - - // TODO use the postStart hook to generate certs? + serverInstallationNamespace := podInfo.Namespace + // Create a CA. aggregatedAPIServerCA, err := certauthority.New(pkix.Name{CommonName: "Placeholder CA"}) if err != nil { return fmt.Errorf("could not initialize CA: %w", err) } + // This string must match the name of the Service declared in the deployment yaml. const serviceName = "placeholder-name-api" - - cert, err := aggregatedAPIServerCA.Issue( + // Using the CA from above, create a TLS server cert for the aggregated API server to use. + aggregatedAPIServerTLSCert, err := aggregatedAPIServerCA.Issue( pkix.Name{CommonName: serviceName + "." + serverInstallationNamespace + ".svc"}, []string{}, 24*365*time.Hour, @@ -200,35 +153,41 @@ func (a *App) run( return fmt.Errorf("could not issue serving certificate: %w", err) } - if err := autoregistration.UpdateAPIService(ctx, aggregationClient, aggregatedAPIServerCA.Bundle()); err != nil { - return fmt.Errorf("could not register API service: %w", err) - } - - cmrf := wireControllerManagerRunFunc( + // Prepare to start the controllers, but defer actually starting them until the + // post start hook of the aggregated API server. + startControllersFunc, err := controller.PrepareControllers( + ctx, + aggregatedAPIServerCA.Bundle(), serverInstallationNamespace, cfg.DiscoveryConfig.URL, - k8sClient, - placeholderClient, ) - apiServerConfig, err := a.configServer( - cert, + if err != nil { + return fmt.Errorf("could not prepare controllers: %w", err) + } + + // Get the aggregated API server config. + aggregatedAPIServerConfig, err := getAggregatedAPIServerConfig( + aggregatedAPIServerTLSCert, webhookTokenAuthenticator, - clientCA, - cmrf, + k8sClusterCA, + startControllersFunc, ) if err != nil { - return err + return fmt.Errorf("could not configure aggregated API server: %w", err) } - server, err := apiServerConfig.Complete().New() + // Complete the aggregated API server config and make a server instance. + server, err := aggregatedAPIServerConfig.Complete().New() if err != nil { - return fmt.Errorf("could not issue serving certificate: %w", err) + return fmt.Errorf("could not create aggregated API server: %w", err) } + // Run the server. Its post-start hook will start the controllers. return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) } -func (a *App) configServer( +// Create a configuration for the aggregated API server. +func getAggregatedAPIServerConfig( cert *tls.Certificate, webhookTokenAuthenticator *webhook.WebhookTokenAuthenticator, ca *certauthority.CA, @@ -238,10 +197,17 @@ func (a *App) configServer( if err != nil { return nil, fmt.Errorf("could not create static cert key provider: %w", err) } - a.recommendedOptions.SecureServing.ServerCert.GeneratedCert = provider + + recommendedOptions := genericoptions.NewRecommendedOptions( + defaultEtcdPathPrefix, + apiserver.Codecs.LegacyCodec(placeholderv1alpha1.SchemeGroupVersion), + // TODO we should check to see if all the other default settings are acceptable for us + ) + recommendedOptions.Etcd = nil // turn off etcd storage because we don't need it yet + recommendedOptions.SecureServing.ServerCert.GeneratedCert = provider serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs) - if err := a.recommendedOptions.ApplyTo(serverConfig); err != nil { + if err := recommendedOptions.ApplyTo(serverConfig); err != nil { return nil, err } @@ -256,16 +222,6 @@ func (a *App) configServer( return apiServerConfig, nil } -// createProtoKubeConfig returns a copy of the input config with the ContentConfig set to use protobuf. -// do not use this config to communicate with any CRD based APIs. -func createProtoKubeConfig(kubeConfig *restclient.Config) *restclient.Config { - protoKubeConfig := restclient.CopyConfig(kubeConfig) - const protoThenJSON = runtime.ContentTypeProtobuf + "," + runtime.ContentTypeJSON - protoKubeConfig.AcceptContentTypes = protoThenJSON - protoKubeConfig.ContentType = runtime.ContentTypeProtobuf - return protoKubeConfig -} - func createStaticCertKeyProvider(cert *tls.Certificate) (dynamiccertificates.CertKeyContentProvider, error) { privateKeyDER, err := x509.MarshalPKCS8PrivateKey(cert.PrivateKey) if err != nil { @@ -289,41 +245,3 @@ func createStaticCertKeyProvider(cert *tls.Certificate) (dynamiccertificates.Cer return dynamiccertificates.NewStaticCertKeyContent("some-name???", certChainPEM, privateKeyPEM) } - -func wireControllerManagerRunFunc( - serverInstallationNamespace string, - discoveryURLOverride *string, - k8s kubernetes.Interface, - placeholder placeholderclientset.Interface, -) func(ctx context.Context) { - k8sInformers := k8sinformers.NewSharedInformerFactoryWithOptions( - k8s, - defaultResyncInterval, - k8sinformers.WithNamespace( - logindiscovery.ClusterInfoNamespace, - ), - ) - placeholderInformers := placeholderinformers.NewSharedInformerFactoryWithOptions( - placeholder, - defaultResyncInterval, - placeholderinformers.WithNamespace(serverInstallationNamespace), - ) - cm := controller. - NewManager(). - WithController( - logindiscovery.NewPublisherController( - serverInstallationNamespace, - discoveryURLOverride, - placeholder, - k8sInformers.Core().V1().ConfigMaps(), - placeholderInformers.Crds().V1alpha1().LoginDiscoveryConfigs(), - controller.WithInformer, - ), - singletonWorker, - ) - return func(ctx context.Context) { - k8sInformers.Start(ctx.Done()) - placeholderInformers.Start(ctx.Done()) - go cm.Start(ctx) - } -} diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 5ae9abe88..fc31dc4e4 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -78,7 +78,7 @@ func TestCommand(t *testing.T) { stderr := bytes.NewBuffer([]byte{}) a := New(context.Background(), test.args, stdout, stderr) - a.cmd.RunE = func(cmd *cobra.Command, args []string) error { + a.serverCommand.RunE = func(cmd *cobra.Command, args []string) error { return nil } err := a.Run()