Files
velero/pkg/cmd/server/server.go
Andy Goldstein e0d2a2d4d0 Remove k8s.io/apiextensions-apiserver from vendor
Remove k8s.io/apiextensions-apiserver since we're no longer registering
CRDs in code.

Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
2017-08-07 12:38:07 -04:00

531 lines
14 KiB
Go

/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/golang/glog"
"github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
arkaws "github.com/heptio/ark/pkg/cloudprovider/aws"
"github.com/heptio/ark/pkg/cloudprovider/azure"
"github.com/heptio/ark/pkg/cloudprovider/gcp"
"github.com/heptio/ark/pkg/cmd"
"github.com/heptio/ark/pkg/controller"
arkdiscovery "github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/generated/clientset"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/restore"
"github.com/heptio/ark/pkg/restore/restorers"
"github.com/heptio/ark/pkg/util/kube"
)
func NewCommand() *cobra.Command {
var kubeconfig string
var command = &cobra.Command{
Use: "server",
Short: "Run the ark server",
Long: "Run the ark server",
Run: func(c *cobra.Command, args []string) {
s, err := newServer(kubeconfig)
cmd.CheckError(err)
cmd.CheckError(s.run())
},
}
command.Flags().StringVar(&kubeconfig, "kubeconfig", "", "Path to the kubeconfig file to use to talk to the Kubernetes apiserver. If unset, try the environment variable KUBECONFIG, as well as in-cluster configuration")
return command
}
type server struct {
kubeClient kubernetes.Interface
arkClient clientset.Interface
backupService cloudprovider.BackupService
snapshotService cloudprovider.SnapshotService
discoveryClient discovery.DiscoveryInterface
clientPool dynamic.ClientPool
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
}
func newServer(kubeconfig string) (*server, error) {
clientConfig, err := client.Config(kubeconfig)
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
arkClient, err := clientset.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(context.Background())
s := &server{
kubeClient: kubeClient,
arkClient: arkClient,
discoveryClient: arkClient.Discovery(),
clientPool: dynamic.NewDynamicClientPool(clientConfig),
sharedInformerFactory: informers.NewSharedInformerFactory(arkClient, 0),
ctx: ctx,
cancelFunc: cancelFunc,
}
return s, nil
}
func (s *server) run() error {
if err := s.ensureArkNamespace(); err != nil {
return err
}
config, err := s.loadConfig()
if err != nil {
return err
}
applyConfigDefaults(config)
s.watchConfig(config)
if err := s.initBackupService(config); err != nil {
return err
}
if err := s.initSnapshotService(config); err != nil {
return err
}
if err := s.runControllers(config); err != nil {
return err
}
return nil
}
func (s *server) ensureArkNamespace() error {
glog.Infof("Ensuring %s namespace exists for backups", api.DefaultNamespace)
defaultNamespace := v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: api.DefaultNamespace,
},
}
if created, err := kube.EnsureNamespaceExists(&defaultNamespace, s.kubeClient.CoreV1().Namespaces()); created {
glog.Infof("Namespace created")
} else if err != nil {
return err
}
glog.Infof("Namespace already exists")
return nil
}
func (s *server) loadConfig() (*api.Config, error) {
glog.Infof("Retrieving Ark configuration")
var (
config *api.Config
err error
)
for {
config, err = s.arkClient.ArkV1().Configs(api.DefaultNamespace).Get("default", metav1.GetOptions{})
if err == nil {
break
}
if !apierrors.IsNotFound(err) {
glog.Errorf("error retrieving configuration: %v", err)
}
glog.Infof("Will attempt to retrieve configuration again in 5 seconds")
time.Sleep(5 * time.Second)
}
glog.Infof("Successfully retrieved Ark configuration")
return config, nil
}
const (
defaultGCSyncPeriod = 60 * time.Minute
defaultBackupSyncPeriod = 60 * time.Minute
defaultScheduleSyncPeriod = time.Minute
)
var defaultResourcePriorities = []string{
"namespaces",
"persistentvolumes",
"persistentvolumeclaims",
"secrets",
"configmaps",
}
func applyConfigDefaults(c *api.Config) {
if c.GCSyncPeriod.Duration == 0 {
c.GCSyncPeriod.Duration = defaultGCSyncPeriod
}
if c.BackupSyncPeriod.Duration == 0 {
c.BackupSyncPeriod.Duration = defaultBackupSyncPeriod
}
if c.ScheduleSyncPeriod.Duration == 0 {
c.ScheduleSyncPeriod.Duration = defaultScheduleSyncPeriod
}
if len(c.ResourcePriorities) == 0 {
c.ResourcePriorities = defaultResourcePriorities
glog.Infof("Using default resource priorities: %v", c.ResourcePriorities)
} else {
glog.Infof("Using resource priorities from config: %v", c.ResourcePriorities)
}
}
// watchConfig adds an update event handler to the Config shared informer, invoking s.cancelFunc
// when it sees a change.
func (s *server) watchConfig(config *api.Config) {
s.sharedInformerFactory.Ark().V1().Configs().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
updated := newObj.(*api.Config)
if updated.Name != config.Name {
glog.V(5).Infof("config watch channel received other config %q", updated.Name)
return
}
if !reflect.DeepEqual(config, updated) {
glog.Infof("Detected a config change. Gracefully shutting down")
s.cancelFunc()
}
},
})
}
func (s *server) initBackupService(config *api.Config) error {
glog.Infof("Configuring cloud provider for backup service")
cloud, err := initCloud(config.BackupStorageProvider.CloudProviderConfig, "backupStorageProvider")
if err != nil {
return err
}
s.backupService = cloudprovider.NewBackupService(cloud.ObjectStorage())
return nil
}
func (s *server) initSnapshotService(config *api.Config) error {
glog.Infof("Configuring cloud provider for snapshot service")
cloud, err := initCloud(config.PersistentVolumeProvider, "persistentVolumeProvider")
if err != nil {
return err
}
s.snapshotService = cloudprovider.NewSnapshotService(cloud.BlockStorage())
return nil
}
func initCloud(config api.CloudProviderConfig, field string) (cloudprovider.StorageAdapter, error) {
var (
cloud cloudprovider.StorageAdapter
err error
)
if config.AWS != nil {
cloud, err = getAWSCloudProvider(config)
}
if config.GCP != nil {
if cloud != nil {
return nil, fmt.Errorf("you may only specify one of aws, gcp, or azure for %s", field)
}
cloud, err = getGCPCloudProvider(config)
}
if config.Azure != nil {
if cloud != nil {
return nil, fmt.Errorf("you may only specify one of aws, gcp, or azure for %s", field)
}
cloud, err = getAzureCloudProvider(config)
}
if err != nil {
return nil, err
}
if cloud == nil {
return nil, fmt.Errorf("you must specify one of aws, gcp, or azure for %s", field)
}
return cloud, err
}
func getAWSCloudProvider(cloudConfig api.CloudProviderConfig) (cloudprovider.StorageAdapter, error) {
if cloudConfig.AWS == nil {
return nil, errors.New("missing aws configuration in config file")
}
if cloudConfig.AWS.Region == "" {
return nil, errors.New("missing region in aws configuration in config file")
}
if cloudConfig.AWS.AvailabilityZone == "" {
return nil, errors.New("missing availabilityZone in aws configuration in config file")
}
awsConfig := aws.NewConfig().
WithRegion(cloudConfig.AWS.Region).
WithS3ForcePathStyle(cloudConfig.AWS.S3ForcePathStyle)
if cloudConfig.AWS.S3Url != "" {
awsConfig = awsConfig.WithEndpointResolver(
endpoints.ResolverFunc(func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == endpoints.S3ServiceID {
return endpoints.ResolvedEndpoint{
URL: cloudConfig.AWS.S3Url,
}, nil
}
return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}),
)
}
return arkaws.NewStorageAdapter(awsConfig, cloudConfig.AWS.AvailabilityZone)
}
func getGCPCloudProvider(cloudConfig api.CloudProviderConfig) (cloudprovider.StorageAdapter, error) {
if cloudConfig.GCP == nil {
return nil, errors.New("missing gcp configuration in config file")
}
if cloudConfig.GCP.Project == "" {
return nil, errors.New("missing project in gcp configuration in config file")
}
if cloudConfig.GCP.Zone == "" {
return nil, errors.New("missing zone in gcp configuration in config file")
}
return gcp.NewStorageAdapter(cloudConfig.GCP.Project, cloudConfig.GCP.Zone)
}
func getAzureCloudProvider(cloudConfig api.CloudProviderConfig) (cloudprovider.StorageAdapter, error) {
if cloudConfig.Azure == nil {
return nil, errors.New("missing azure configuration in config file")
}
if cloudConfig.Azure.Location == "" {
return nil, errors.New("missing location in azure configuration in config file")
}
return azure.NewStorageAdapter(cloudConfig.Azure.Location, cloudConfig.Azure.APITimeout.Duration)
}
func durationMin(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
func (s *server) runControllers(config *api.Config) error {
glog.Infof("Starting controllers")
ctx := s.ctx
var wg sync.WaitGroup
cloudBackupCacheResyncPeriod := durationMin(config.GCSyncPeriod.Duration, config.BackupSyncPeriod.Duration)
glog.Infof("Caching cloud backups every %s", cloudBackupCacheResyncPeriod)
s.backupService = cloudprovider.NewBackupServiceWithCachedBackupGetter(
ctx,
s.backupService,
cloudBackupCacheResyncPeriod,
)
backupSyncController := controller.NewBackupSyncController(
s.arkClient.ArkV1(),
s.backupService,
config.BackupStorageProvider.Bucket,
config.BackupSyncPeriod.Duration,
)
wg.Add(1)
go func() {
backupSyncController.Run(ctx, 1)
wg.Done()
}()
discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient)
if err != nil {
return err
}
go wait.Until(
func() {
if err := discoveryHelper.Refresh(); err != nil {
glog.Errorf("error refreshing discovery: %v", err)
}
},
5*time.Minute,
ctx.Done(),
)
if config.RestoreOnlyMode {
glog.Infof("Restore only mode - not starting the backup, schedule or GC controllers")
} else {
backupper, err := newBackupper(discoveryHelper, s.clientPool, s.backupService, s.snapshotService)
cmd.CheckError(err)
backupController := controller.NewBackupController(
s.sharedInformerFactory.Ark().V1().Backups(),
s.arkClient.ArkV1(),
backupper,
s.backupService,
config.BackupStorageProvider.Bucket,
)
wg.Add(1)
go func() {
backupController.Run(ctx, 1)
wg.Done()
}()
scheduleController := controller.NewScheduleController(
s.arkClient.ArkV1(),
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().Schedules(),
config.ScheduleSyncPeriod.Duration,
)
wg.Add(1)
go func() {
scheduleController.Run(ctx, 1)
wg.Done()
}()
gcController := controller.NewGCController(
s.backupService,
s.snapshotService,
config.BackupStorageProvider.Bucket,
config.GCSyncPeriod.Duration,
s.sharedInformerFactory.Ark().V1().Backups(),
s.arkClient.ArkV1(),
)
wg.Add(1)
go func() {
gcController.Run(ctx, 1)
wg.Done()
}()
}
restorer, err := newRestorer(
discoveryHelper,
s.clientPool,
s.backupService,
s.snapshotService,
config.ResourcePriorities,
s.arkClient.ArkV1(),
s.kubeClient,
)
cmd.CheckError(err)
restoreController := controller.NewRestoreController(
s.sharedInformerFactory.Ark().V1().Restores(),
s.arkClient.ArkV1(),
s.arkClient.ArkV1(),
restorer,
s.backupService,
config.BackupStorageProvider.Bucket,
s.sharedInformerFactory.Ark().V1().Backups(),
)
wg.Add(1)
go func() {
restoreController.Run(ctx, 1)
wg.Done()
}()
// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())
glog.Infof("Server started successfully")
<-ctx.Done()
glog.Info("Waiting for all controllers to shut down gracefully")
wg.Wait()
return nil
}
func newBackupper(
discoveryHelper arkdiscovery.Helper,
clientPool dynamic.ClientPool,
backupService cloudprovider.BackupService,
snapshotService cloudprovider.SnapshotService,
) (backup.Backupper, error) {
actions := map[string]backup.Action{}
if snapshotService != nil {
actions["persistentvolumes"] = backup.NewVolumeSnapshotAction(snapshotService)
}
return backup.NewKubernetesBackupper(
discoveryHelper,
client.NewDynamicFactory(clientPool),
actions,
)
}
func newRestorer(
discoveryHelper arkdiscovery.Helper,
clientPool dynamic.ClientPool,
backupService cloudprovider.BackupService,
snapshotService cloudprovider.SnapshotService,
resourcePriorities []string,
backupClient arkv1client.BackupsGetter,
kubeClient kubernetes.Interface,
) (restore.Restorer, error) {
restorers := map[string]restorers.ResourceRestorer{
"persistentvolumes": restorers.NewPersistentVolumeRestorer(snapshotService),
"persistentvolumeclaims": restorers.NewPersistentVolumeClaimRestorer(),
"services": restorers.NewServiceRestorer(),
"namespaces": restorers.NewNamespaceRestorer(),
"pods": restorers.NewPodRestorer(),
"jobs": restorers.NewJobRestorer(),
}
return restore.NewKubernetesRestorer(
discoveryHelper,
client.NewDynamicFactory(clientPool),
restorers,
backupService,
resourcePriorities,
backupClient,
kubeClient.CoreV1().Namespaces(),
)
}