Initial aggregated API server (#15)

Add initial aggregated API server (squashed from a bunch of commits).

Signed-off-by: Andrew Keesler <akeesler@vmware.com>
Signed-off-by: Aram Price <pricear@vmware.com>
Signed-off-by: Ryan Richard <richardry@vmware.com>
This commit is contained in:
Mo Khan
2020-07-23 11:05:21 -04:00
committed by GitHub
parent 23c1b32a02
commit 5fdc20886d
14 changed files with 906 additions and 152 deletions

View File

@@ -9,38 +9,40 @@ package app
import (
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"errors"
"encoding/pem"
"fmt"
"io"
"log"
"net"
"net/http"
"time"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1"
"github.com/suzerain-io/placeholder-name/internal/autoregistration"
"github.com/suzerain-io/placeholder-name/internal/certauthority"
"github.com/suzerain-io/placeholder-name/internal/downward"
"github.com/suzerain-io/placeholder-name/pkg/config"
"github.com/suzerain-io/placeholder-name/pkg/handlers"
)
// shutdownGracePeriod controls how long active connections are allowed to continue at shutdown.
const shutdownGracePeriod = 5 * time.Second
"k8s.io/apimachinery/pkg/runtime"
"github.com/spf13/cobra"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/plugin/pkg/authenticator/token/webhook"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
aggregationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1"
"github.com/suzerain-io/placeholder-name/pkg/apiserver"
"github.com/suzerain-io/placeholder-name/pkg/config"
)
// App is an object that represents the placeholder-name application.
type App struct {
@@ -50,22 +52,23 @@ type App struct {
configPath string
downwardAPIPath string
// listen address for healthz serve
healthAddr string
// listen address for main serve
mainAddr string
// webhook authenticates tokens
webhook authenticator.Token
recommendedOptions *genericoptions.RecommendedOptions
}
// This is ignored for now because we turn off etcd storage below, but this is
// the right prefix in case we turn it back on.
const defaultEtcdPathPrefix = "/registry/" + placeholderv1alpha1.GroupName
// New constructs a new App with command line args, stdout and stderr.
func New(args []string, stdout, stderr io.Writer) *App {
func New(ctx context.Context, args []string, stdout, stderr io.Writer) *App {
a := &App{
healthAddr: ":8080",
mainAddr: ":443",
recommendedOptions: genericoptions.NewRecommendedOptions(
defaultEtcdPathPrefix,
apiserver.Codecs.LegacyCodec(placeholderv1alpha1.SchemeGroupVersion),
// TODO we should check to see if all the other default settings are acceptable for us
),
}
a.recommendedOptions.Etcd = nil // turn off etcd storage because we don't need it yet
cmd := &cobra.Command{
Use: `placeholder-name`,
@@ -93,7 +96,8 @@ authenticating to the Kubernetes API.`,
if err != nil {
return fmt.Errorf("could not initialize Kubernetes client: %w", err)
}
return a.serve(context.Background(), k8s.CoreV1(), aggregation)
return a.run(ctx, k8s.CoreV1(), aggregation)
},
Args: cobra.NoArgs,
}
@@ -126,23 +130,28 @@ func (a *App) Run() error {
return a.cmd.Execute()
}
func (a *App) serve(ctx context.Context, k8s corev1client.CoreV1Interface, aggregation aggregationv1client.Interface) error {
func (a *App) run(
ctx context.Context,
k8s corev1client.CoreV1Interface,
aggregation aggregationv1client.Interface,
) error {
cfg, err := config.FromPath(a.configPath)
if err != nil {
return fmt.Errorf("could not load config: %w", err)
}
webhook, err := config.NewWebhook(cfg.WebhookConfig)
webhookTokenAuthenticator, err := config.NewWebhook(cfg.WebhookConfig)
if err != nil {
return fmt.Errorf("could create webhook client: %w", err)
}
a.webhook = webhook
podinfo, err := downward.Load(a.downwardAPIPath)
if err != nil {
return fmt.Errorf("could not read pod metadata: %w", err)
}
// TODO use the postStart hook to generate certs?
ca, err := certauthority.New(pkix.Name{CommonName: "Placeholder CA"})
if err != nil {
return fmt.Errorf("could not initialize CA: %w", err)
@@ -153,27 +162,26 @@ func (a *App) serve(ctx context.Context, k8s corev1client.CoreV1Interface, aggre
}
log.Printf("initialized CA bundle:\n%s", string(caBundle))
const serviceName = "placeholder-name-api"
cert, err := ca.Issue(
pkix.Name{CommonName: "Placeholder Server"},
[]string{"placeholder-serve"},
pkix.Name{CommonName: serviceName + "." + podinfo.Namespace + ".svc"},
[]string{},
24*365*time.Hour,
)
if err != nil {
return fmt.Errorf("could not issue serving certificate: %w", err)
}
// Start an errgroup to manage the lifetimes of the various listener goroutines.
eg, ctx := errgroup.WithContext(ctx)
// Dynamically register our v1alpha1 API service.
service := corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "placeholder-name-api"},
ObjectMeta: metav1.ObjectMeta{Name: serviceName},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 443,
TargetPort: intstr.IntOrString{IntVal: 443}, //TODO: parse this out of mainAddr
TargetPort: intstr.IntOrString{IntVal: 443},
},
},
Selector: podinfo.Labels,
@@ -188,8 +196,8 @@ func (a *App) serve(ctx context.Context, k8s corev1client.CoreV1Interface, aggre
Group: placeholderv1alpha1.GroupName,
Version: placeholderv1alpha1.SchemeGroupVersion.Version,
CABundle: caBundle,
GroupPriorityMinimum: 2500,
VersionPriority: 10,
GroupPriorityMinimum: 2500, // TODO what is the right value? https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#apiservicespec-v1beta1-apiregistration-k8s-io
VersionPriority: 10, // TODO what is the right value? https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#apiservicespec-v1beta1-apiregistration-k8s-io
},
}
if err := autoregistration.Setup(ctx, autoregistration.SetupOptions{
@@ -202,66 +210,38 @@ func (a *App) serve(ctx context.Context, k8s corev1client.CoreV1Interface, aggre
return fmt.Errorf("could not register API service: %w", err)
}
// Start healthz listener
eg.Go(func() error {
log.Printf("Starting healthz serve on %v", a.healthAddr)
server := http.Server{
BaseContext: func(_ net.Listener) context.Context { return ctx },
Addr: a.healthAddr,
Handler: handlers.New(),
}
return runGracefully(ctx, &server, eg, server.ListenAndServe)
})
// Start main service listener
eg.Go(func() error {
log.Printf("Starting main serve on %v", a.mainAddr)
server := http.Server{
BaseContext: func(_ net.Listener) context.Context { return ctx },
Addr: a.mainAddr,
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{*cert},
},
Handler: http.HandlerFunc(a.exampleHandler),
}
return runGracefully(ctx, &server, eg, func() error {
// Doc for ListenAndServeTLS says we can pass empty strings if we configured
// keypair for TLS in http.Server.TLSConfig.
return server.ListenAndServeTLS("", "")
})
})
if err := eg.Wait(); !errors.Is(err, http.ErrServerClosed) {
apiServerConfig, err := a.ConfigServer(cert, webhookTokenAuthenticator)
if err != nil {
return err
}
return nil
server, err := apiServerConfig.Complete().New()
if err != nil {
return fmt.Errorf("could not issue serving certificate: %w", err)
}
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
// exampleHandler is a stub to be replaced with our real server logic.
func (a *App) exampleHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
func (a *App) ConfigServer(cert *tls.Certificate, webhookTokenAuthenticator *webhook.WebhookTokenAuthenticator) (*apiserver.Config, error) {
provider, err := createStaticCertKeyProvider(cert)
if err != nil {
return nil, fmt.Errorf("could not create static cert key provider: %w", err)
}
a.recommendedOptions.SecureServing.ServerCert.GeneratedCert = provider
rsp, authenticated, err := a.webhook.AuthenticateToken(ctx, "")
log.Printf("token response: %+v", rsp)
log.Printf("token authenticated: %+v", authenticated)
log.Printf("token err: %+v", err)
serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)
if err := a.recommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err
}
_, _ = w.Write([]byte("hello world"))
}
// runGracefully runs an http.Server with graceful shutdown.
func runGracefully(ctx context.Context, srv *http.Server, eg *errgroup.Group, f func() error) error {
// Start the listener in a child goroutine.
eg.Go(f)
// If/when the context is canceled or times out, initiate shutting down the serve.
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGracePeriod)
defer cancel()
return srv.Shutdown(shutdownCtx)
apiServerConfig := &apiserver.Config{
GenericConfig: serverConfig,
ExtraConfig: apiserver.ExtraConfig{
Webhook: webhookTokenAuthenticator,
},
}
return apiServerConfig, nil
}
// createProtoKubeConfig returns a copy of the input config with the ContentConfig set to use protobuf.
@@ -273,3 +253,27 @@ func createProtoKubeConfig(kubeConfig *restclient.Config) *restclient.Config {
protoKubeConfig.ContentType = runtime.ContentTypeProtobuf
return protoKubeConfig
}
func createStaticCertKeyProvider(cert *tls.Certificate) (dynamiccertificates.CertKeyContentProvider, error) {
privateKeyDER, err := x509.MarshalPKCS8PrivateKey(cert.PrivateKey)
if err != nil {
return nil, fmt.Errorf("error marshalling private key: %w", err)
}
privateKeyPEM := pem.EncodeToMemory(&pem.Block{
Type: "PRIVATE KEY",
Headers: nil,
Bytes: privateKeyDER,
})
certChainPEM := make([]byte, 0)
for _, certFromChain := range cert.Certificate {
certPEMBytes := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Headers: nil,
Bytes: certFromChain,
})
certChainPEM = append(certChainPEM, certPEMBytes...)
}
return dynamiccertificates.NewStaticCertKeyContent("some-name???", certChainPEM, privateKeyPEM)
}

View File

@@ -10,14 +10,10 @@ import (
"context"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1fake "k8s.io/client-go/kubernetes/fake"
aggregationv1fake "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
)
const knownGoodUsage = `
@@ -29,9 +25,10 @@ Usage:
placeholder-name [flags]
Flags:
-c, --config string path to configuration file (default "placeholder-name.yaml")
--downward-api-path string path to Downward API volume mount (default "/etc/podinfo")
-h, --help help for placeholder-name
-c, --config string path to configuration file (default "placeholder-name.yaml")
--downward-api-path string path to Downward API volume mount (default "/etc/podinfo")
-h, --help help for placeholder-name
--log-flush-frequency duration Maximum number of seconds between log flushes (default 5s)
`
func TestCommand(t *testing.T) {
@@ -78,7 +75,7 @@ func TestCommand(t *testing.T) {
stdout := bytes.NewBuffer([]byte{})
stderr := bytes.NewBuffer([]byte{})
a := New(test.args, stdout, stderr)
a := New(context.Background(), test.args, stdout, stderr)
a.cmd.RunE = func(cmd *cobra.Command, args []string) error {
return nil
}
@@ -89,45 +86,8 @@ func TestCommand(t *testing.T) {
require.NoError(t, err)
}
if test.wantStdout != "" {
require.Equal(t, strings.TrimSpace(test.wantStdout), strings.TrimSpace(stdout.String()))
require.Equal(t, strings.TrimSpace(test.wantStdout), strings.TrimSpace(stdout.String()), cmp.Diff(test.wantStdout, stdout.String()))
}
})
}
}
func TestServeApp(t *testing.T) {
t.Parallel()
fakev1 := corev1fake.NewSimpleClientset(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "test-namespace"}})
fakeaggregationv1 := aggregationv1fake.NewSimpleClientset()
t.Run("success", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
cancel()
a := App{
healthAddr: "127.0.0.1:0",
mainAddr: "127.0.0.1:8443",
configPath: "testdata/valid-config.yaml",
downwardAPIPath: "testdata/podinfo",
}
err := a.serve(ctx, fakev1.CoreV1(), fakeaggregationv1)
require.NoError(t, err)
})
t.Run("failure", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := App{
healthAddr: "127.0.0.1:8081",
mainAddr: "127.0.0.1:8081",
configPath: "testdata/valid-config.yaml",
downwardAPIPath: "testdata/podinfo",
}
err := a.serve(ctx, fakev1.CoreV1(), fakeaggregationv1)
require.EqualError(t, err, "listen tcp 127.0.0.1:8081: bind: address already in use")
})
}

View File

@@ -8,16 +8,24 @@ package main
import (
"os"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/rest"
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
"github.com/suzerain-io/placeholder-name/cmd/placeholder-name/app"
)
func main() {
logs.InitLogs()
defer logs.FlushLogs()
klog.Infof("Running %s at %#v", rest.DefaultKubernetesUserAgent(), version.Get())
if err := app.New(os.Args[1:], os.Stdout, os.Stderr).Run(); err != nil {
os.Exit(1)
ctx := genericapiserver.SetupSignalContext()
if err := app.New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run(); err != nil {
klog.Fatal(err)
}
}