Files
velero/pkg/cmd/server/server.go
Andy Goldstein 273b563c86 Add per-restore logs
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
2017-09-13 09:51:40 -04:00

569 lines
15 KiB
Go

/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"fmt"
"io/ioutil"
"os"
"reflect"
"sync"
"time"
"golang.org/x/oauth2/google"
"github.com/golang/glog"
"github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
arkaws "github.com/heptio/ark/pkg/cloudprovider/aws"
"github.com/heptio/ark/pkg/cloudprovider/azure"
"github.com/heptio/ark/pkg/cloudprovider/gcp"
"github.com/heptio/ark/pkg/cmd"
"github.com/heptio/ark/pkg/controller"
arkdiscovery "github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/generated/clientset"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/restore"
"github.com/heptio/ark/pkg/restore/restorers"
"github.com/heptio/ark/pkg/util/kube"
)
func NewCommand() *cobra.Command {
var kubeconfig string
var command = &cobra.Command{
Use: "server",
Short: "Run the ark server",
Long: "Run the ark server",
Run: func(c *cobra.Command, args []string) {
s, err := newServer(kubeconfig)
cmd.CheckError(err)
cmd.CheckError(s.run())
},
}
command.Flags().StringVar(&kubeconfig, "kubeconfig", "", "Path to the kubeconfig file to use to talk to the Kubernetes apiserver. If unset, try the environment variable KUBECONFIG, as well as in-cluster configuration")
return command
}
type server struct {
kubeClient kubernetes.Interface
arkClient clientset.Interface
backupService cloudprovider.BackupService
snapshotService cloudprovider.SnapshotService
discoveryClient discovery.DiscoveryInterface
clientPool dynamic.ClientPool
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
}
func newServer(kubeconfig string) (*server, error) {
clientConfig, err := client.Config(kubeconfig)
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
arkClient, err := clientset.NewForConfig(clientConfig)
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(context.Background())
s := &server{
kubeClient: kubeClient,
arkClient: arkClient,
discoveryClient: arkClient.Discovery(),
clientPool: dynamic.NewDynamicClientPool(clientConfig),
sharedInformerFactory: informers.NewSharedInformerFactory(arkClient, 0),
ctx: ctx,
cancelFunc: cancelFunc,
}
return s, nil
}
func (s *server) run() error {
if err := s.ensureArkNamespace(); err != nil {
return err
}
config, err := s.loadConfig()
if err != nil {
return err
}
applyConfigDefaults(config)
s.watchConfig(config)
if err := s.initBackupService(config); err != nil {
return err
}
if err := s.initSnapshotService(config); err != nil {
return err
}
if err := s.runControllers(config); err != nil {
return err
}
return nil
}
func (s *server) ensureArkNamespace() error {
glog.Infof("Ensuring %s namespace exists for backups", api.DefaultNamespace)
defaultNamespace := v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: api.DefaultNamespace,
},
}
if created, err := kube.EnsureNamespaceExists(&defaultNamespace, s.kubeClient.CoreV1().Namespaces()); created {
glog.Infof("Namespace created")
} else if err != nil {
return err
}
glog.Infof("Namespace already exists")
return nil
}
func (s *server) loadConfig() (*api.Config, error) {
glog.Infof("Retrieving Ark configuration")
var (
config *api.Config
err error
)
for {
config, err = s.arkClient.ArkV1().Configs(api.DefaultNamespace).Get("default", metav1.GetOptions{})
if err == nil {
break
}
if !apierrors.IsNotFound(err) {
glog.Errorf("error retrieving configuration: %v", err)
}
glog.Infof("Will attempt to retrieve configuration again in 5 seconds")
time.Sleep(5 * time.Second)
}
glog.Infof("Successfully retrieved Ark configuration")
return config, nil
}
const (
defaultGCSyncPeriod = 60 * time.Minute
defaultBackupSyncPeriod = 60 * time.Minute
defaultScheduleSyncPeriod = time.Minute
)
var defaultResourcePriorities = []string{
"namespaces",
"persistentvolumes",
"persistentvolumeclaims",
"secrets",
"configmaps",
}
func applyConfigDefaults(c *api.Config) {
if c.GCSyncPeriod.Duration == 0 {
c.GCSyncPeriod.Duration = defaultGCSyncPeriod
}
if c.BackupSyncPeriod.Duration == 0 {
c.BackupSyncPeriod.Duration = defaultBackupSyncPeriod
}
if c.ScheduleSyncPeriod.Duration == 0 {
c.ScheduleSyncPeriod.Duration = defaultScheduleSyncPeriod
}
if len(c.ResourcePriorities) == 0 {
c.ResourcePriorities = defaultResourcePriorities
glog.Infof("Using default resource priorities: %v", c.ResourcePriorities)
} else {
glog.Infof("Using resource priorities from config: %v", c.ResourcePriorities)
}
}
// watchConfig adds an update event handler to the Config shared informer, invoking s.cancelFunc
// when it sees a change.
func (s *server) watchConfig(config *api.Config) {
s.sharedInformerFactory.Ark().V1().Configs().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
updated := newObj.(*api.Config)
if updated.Name != config.Name {
glog.V(5).Infof("config watch channel received other config %q", updated.Name)
return
}
if !reflect.DeepEqual(config, updated) {
glog.Infof("Detected a config change. Gracefully shutting down")
s.cancelFunc()
}
},
})
}
func (s *server) initBackupService(config *api.Config) error {
glog.Infof("Configuring cloud provider for backup service")
objectStorage, err := getObjectStorageProvider(config.BackupStorageProvider.CloudProviderConfig, "backupStorageProvider")
if err != nil {
return err
}
s.backupService = cloudprovider.NewBackupService(objectStorage)
return nil
}
func (s *server) initSnapshotService(config *api.Config) error {
if config.PersistentVolumeProvider == nil {
glog.Infof("PersistentVolumeProvider config not provided, volume snapshots and restores are disabled")
return nil
}
glog.Infof("Configuring cloud provider for snapshot service")
blockStorage, err := getBlockStorageProvider(*config.PersistentVolumeProvider, "persistentVolumeProvider")
if err != nil {
return err
}
s.snapshotService = cloudprovider.NewSnapshotService(blockStorage)
return nil
}
func hasOneCloudProvider(cloudConfig api.CloudProviderConfig) bool {
found := false
if cloudConfig.AWS != nil {
found = true
}
if cloudConfig.GCP != nil {
if found {
return false
}
found = true
}
if cloudConfig.Azure != nil {
if found {
return false
}
found = true
}
return found
}
func getObjectStorageProvider(cloudConfig api.CloudProviderConfig, field string) (cloudprovider.ObjectStorageAdapter, error) {
var (
objectStorage cloudprovider.ObjectStorageAdapter
err error
)
if !hasOneCloudProvider(cloudConfig) {
return nil, fmt.Errorf("you must specify exactly one of aws, gcp, or azure for %s", field)
}
switch {
case cloudConfig.AWS != nil:
objectStorage, err = arkaws.NewObjectStorageAdapter(
cloudConfig.AWS.Region,
cloudConfig.AWS.S3Url,
cloudConfig.AWS.KMSKeyID,
cloudConfig.AWS.S3ForcePathStyle)
case cloudConfig.GCP != nil:
var email string
var privateKey []byte
credentialsFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if credentialsFile != "" {
// Get the email and private key from the credentials file so we can pre-sign download URLs
creds, err := ioutil.ReadFile(credentialsFile)
if err != nil {
return nil, err
}
jwtConfig, err := google.JWTConfigFromJSON(creds)
if err != nil {
return nil, err
}
email = jwtConfig.Email
privateKey = jwtConfig.PrivateKey
} else {
glog.Warning("GOOGLE_APPLICATION_CREDENTIALS is undefined; some features such as downloading log files will not work")
}
objectStorage, err = gcp.NewObjectStorageAdapter(email, privateKey)
case cloudConfig.Azure != nil:
objectStorage, err = azure.NewObjectStorageAdapter()
}
if err != nil {
return nil, err
}
return objectStorage, nil
}
func getBlockStorageProvider(cloudConfig api.CloudProviderConfig, field string) (cloudprovider.BlockStorageAdapter, error) {
var (
blockStorage cloudprovider.BlockStorageAdapter
err error
)
if !hasOneCloudProvider(cloudConfig) {
return nil, fmt.Errorf("you must specify exactly one of aws, gcp, or azure for %s", field)
}
switch {
case cloudConfig.AWS != nil:
blockStorage, err = arkaws.NewBlockStorageAdapter(cloudConfig.AWS.Region, cloudConfig.AWS.AvailabilityZone)
case cloudConfig.GCP != nil:
blockStorage, err = gcp.NewBlockStorageAdapter(cloudConfig.GCP.Project, cloudConfig.GCP.Zone)
case cloudConfig.Azure != nil:
blockStorage, err = azure.NewBlockStorageAdapter(cloudConfig.Azure.Location, cloudConfig.Azure.APITimeout.Duration)
}
if err != nil {
return nil, err
}
return blockStorage, nil
}
func durationMin(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
func (s *server) runControllers(config *api.Config) error {
glog.Infof("Starting controllers")
ctx := s.ctx
var wg sync.WaitGroup
cloudBackupCacheResyncPeriod := durationMin(config.GCSyncPeriod.Duration, config.BackupSyncPeriod.Duration)
glog.Infof("Caching cloud backups every %s", cloudBackupCacheResyncPeriod)
s.backupService = cloudprovider.NewBackupServiceWithCachedBackupGetter(
ctx,
s.backupService,
cloudBackupCacheResyncPeriod,
)
backupSyncController := controller.NewBackupSyncController(
s.arkClient.ArkV1(),
s.backupService,
config.BackupStorageProvider.Bucket,
config.BackupSyncPeriod.Duration,
)
wg.Add(1)
go func() {
backupSyncController.Run(ctx, 1)
wg.Done()
}()
discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient)
if err != nil {
return err
}
go wait.Until(
func() {
if err := discoveryHelper.Refresh(); err != nil {
glog.Errorf("error refreshing discovery: %v", err)
}
},
5*time.Minute,
ctx.Done(),
)
if config.RestoreOnlyMode {
glog.Infof("Restore only mode - not starting the backup, schedule or GC controllers")
} else {
backupper, err := newBackupper(discoveryHelper, s.clientPool, s.backupService, s.snapshotService)
cmd.CheckError(err)
backupController := controller.NewBackupController(
s.sharedInformerFactory.Ark().V1().Backups(),
s.arkClient.ArkV1(),
backupper,
s.backupService,
config.BackupStorageProvider.Bucket,
s.snapshotService != nil,
)
wg.Add(1)
go func() {
backupController.Run(ctx, 1)
wg.Done()
}()
scheduleController := controller.NewScheduleController(
s.arkClient.ArkV1(),
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().Schedules(),
config.ScheduleSyncPeriod.Duration,
)
wg.Add(1)
go func() {
scheduleController.Run(ctx, 1)
wg.Done()
}()
gcController := controller.NewGCController(
s.backupService,
s.snapshotService,
config.BackupStorageProvider.Bucket,
config.GCSyncPeriod.Duration,
s.sharedInformerFactory.Ark().V1().Backups(),
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().Restores(),
s.arkClient.ArkV1(),
)
wg.Add(1)
go func() {
gcController.Run(ctx, 1)
wg.Done()
}()
}
restorer, err := newRestorer(
discoveryHelper,
s.clientPool,
s.backupService,
s.snapshotService,
config.ResourcePriorities,
s.arkClient.ArkV1(),
s.kubeClient,
)
cmd.CheckError(err)
restoreController := controller.NewRestoreController(
s.sharedInformerFactory.Ark().V1().Restores(),
s.arkClient.ArkV1(),
s.arkClient.ArkV1(),
restorer,
s.backupService,
config.BackupStorageProvider.Bucket,
s.sharedInformerFactory.Ark().V1().Backups(),
s.snapshotService != nil,
)
wg.Add(1)
go func() {
restoreController.Run(ctx, 1)
wg.Done()
}()
downloadRequestController := controller.NewDownloadRequestController(
s.arkClient.ArkV1(),
s.sharedInformerFactory.Ark().V1().DownloadRequests(),
s.backupService,
config.BackupStorageProvider.Bucket,
)
wg.Add(1)
go func() {
downloadRequestController.Run(ctx, 1)
wg.Done()
}()
// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())
glog.Infof("Server started successfully")
<-ctx.Done()
glog.Info("Waiting for all controllers to shut down gracefully")
wg.Wait()
return nil
}
func newBackupper(
discoveryHelper arkdiscovery.Helper,
clientPool dynamic.ClientPool,
backupService cloudprovider.BackupService,
snapshotService cloudprovider.SnapshotService,
) (backup.Backupper, error) {
actions := map[string]backup.Action{}
if snapshotService != nil {
action, err := backup.NewVolumeSnapshotAction(snapshotService)
if err != nil {
return nil, err
}
actions["persistentvolumes"] = action
}
return backup.NewKubernetesBackupper(
discoveryHelper,
client.NewDynamicFactory(clientPool),
actions,
)
}
func newRestorer(
discoveryHelper arkdiscovery.Helper,
clientPool dynamic.ClientPool,
backupService cloudprovider.BackupService,
snapshotService cloudprovider.SnapshotService,
resourcePriorities []string,
backupClient arkv1client.BackupsGetter,
kubeClient kubernetes.Interface,
) (restore.Restorer, error) {
restorers := map[string]restorers.ResourceRestorer{
"persistentvolumes": restorers.NewPersistentVolumeRestorer(snapshotService),
"persistentvolumeclaims": restorers.NewPersistentVolumeClaimRestorer(),
"services": restorers.NewServiceRestorer(),
"namespaces": restorers.NewNamespaceRestorer(),
"pods": restorers.NewPodRestorer(),
"jobs": restorers.NewJobRestorer(),
}
return restore.NewKubernetesRestorer(
discoveryHelper,
client.NewDynamicFactory(clientPool),
restorers,
backupService,
resourcePriorities,
backupClient,
kubeClient.CoreV1().Namespaces(),
)
}