From 914165095a4c34aa9a08023510a5b14c0cc72451 Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Thu, 14 Sep 2017 14:27:31 -0700 Subject: [PATCH] switch logging to logrus and errors to pkg/errors Signed-off-by: Steve Kriss --- pkg/backup/backup.go | 23 ++-- pkg/backup/volume_snapshot_action.go | 5 +- pkg/client/client.go | 4 +- pkg/client/dynamic.go | 6 +- pkg/client/factory.go | 3 +- .../aws/block_storage_adapter.go | 30 ++-- .../aws/object_storage_adapter.go | 14 +- .../azure/block_storage_adapter.go | 20 +-- .../azure/object_storage_adapter.go | 22 +-- pkg/cloudprovider/backup_cache.go | 17 ++- pkg/cloudprovider/backup_cache_test.go | 31 +++-- pkg/cloudprovider/backup_service.go | 42 ++++-- pkg/cloudprovider/backup_service_test.go | 54 +++++--- .../gcp/block_storage_adapter.go | 29 ++-- .../gcp/object_storage_adapter.go | 16 +-- pkg/cloudprovider/snapshot_service.go | 5 +- pkg/cmd/cli/backup/create.go | 2 +- pkg/cmd/cli/backup/download.go | 4 +- pkg/cmd/cli/backup/logs.go | 2 +- pkg/cmd/cli/restore/create.go | 2 +- pkg/cmd/cli/restore/logs.go | 2 +- pkg/cmd/cli/schedule/create.go | 2 +- pkg/cmd/server/server.go | 103 ++++++++------ pkg/cmd/server/server_test.go | 11 +- .../util/downloadrequest/downloadrequest.go | 13 +- pkg/cmd/util/flag/enum.go | 4 +- pkg/cmd/util/flag/map.go | 4 +- pkg/cmd/util/output/output.go | 5 +- pkg/controller/backup_controller.go | 107 +++++++------- pkg/controller/backup_controller_test.go | 19 ++- pkg/controller/backup_sync_controller.go | 34 +++-- pkg/controller/backup_sync_controller_test.go | 9 +- pkg/controller/download_request_controller.go | 86 ++++++------ .../download_request_controller_test.go | 20 ++- pkg/controller/gc_controller.go | 49 ++++--- pkg/controller/gc_controller_test.go | 7 + pkg/controller/restore_controller.go | 130 ++++++++++-------- pkg/controller/restore_controller_test.go | 17 ++- pkg/controller/schedule_controller.go | 93 +++++++------ pkg/controller/schedule_controller_test.go | 7 +- pkg/discovery/helper.go | 13 +- pkg/restore/resource_waiter.go | 6 +- pkg/restore/restore.go | 21 +-- pkg/restore/restore_test.go | 5 +- pkg/restore/restorers/job_restorer.go | 36 ++--- pkg/restore/restorers/job_restorer_test.go | 6 +- pkg/restore/restorers/pod_restorer.go | 40 +++--- pkg/restore/restorers/pod_restorer_test.go | 6 +- pkg/restore/restorers/pv_restorer.go | 5 +- pkg/util/collections/includes_excludes.go | 21 +-- .../collections/includes_excludes_test.go | 10 +- pkg/util/collections/map_utils.go | 16 +-- pkg/util/encode/encode.go | 6 +- pkg/util/kube/utils.go | 5 +- .../pkg/kubectl/cmd/util/shortcut_expander.go | 7 +- 55 files changed, 713 insertions(+), 543 deletions(-) diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index e1fdfcf9f..112ff27ff 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -112,13 +114,14 @@ func (ctx *backupContext) getResourceIncludesExcludes(helper discovery.Helper, i return collections.GenerateIncludesExcludes( includes, excludes, - func(item string) (string, error) { + func(item string) string { gr, err := helper.ResolveGroupResource(item) if err != nil { - return "", err + ctx.log("Unable to resolve resource %q: %v", item, err) + return "" } - return gr.String(), nil + return gr.String() }, ) } @@ -238,7 +241,7 @@ func (kb *kubernetesBackupper) backupResource( gv, err := schema.ParseGroupVersion(group.GroupVersion) if err != nil { - return err + return errors.Wrapf(err, "error parsing GroupVersion %s", group.GroupVersion) } gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version} gr := schema.GroupResource{Group: gv.Group, Resource: resource.Name} @@ -297,13 +300,13 @@ func (kb *kubernetesBackupper) backupResource( } unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { - return err + return errors.WithStack(err) } // do the backup items, err := meta.ExtractList(unstructuredList) if err != nil { - return err + return errors.WithStack(err) } action := kb.actions[gr] @@ -311,7 +314,7 @@ func (kb *kubernetesBackupper) backupResource( for _, item := range items { unstructured, ok := item.(runtime.Unstructured) if !ok { - errs = append(errs, fmt.Errorf("unexpected type %T", item)) + errs = append(errs, errors.Errorf("unexpected type %T", item)) continue } @@ -399,7 +402,7 @@ func (*realItemBackupper) backupItem(ctx *backupContext, item map[string]interfa itemBytes, err := json.Marshal(item) if err != nil { - return err + return errors.WithStack(err) } hdr := &tar.Header{ @@ -411,11 +414,11 @@ func (*realItemBackupper) backupItem(ctx *backupContext, item map[string]interfa } if err := ctx.w.WriteHeader(hdr); err != nil { - return err + return errors.WithStack(err) } if _, err := ctx.w.Write(itemBytes); err != nil { - return err + return errors.WithStack(err) } return nil diff --git a/pkg/backup/volume_snapshot_action.go b/pkg/backup/volume_snapshot_action.go index 2aebd4bf2..36a913ddc 100644 --- a/pkg/backup/volume_snapshot_action.go +++ b/pkg/backup/volume_snapshot_action.go @@ -17,9 +17,10 @@ limitations under the License. package backup import ( - "errors" "fmt" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/clock" api "github.com/heptio/ark/pkg/apis/ark/v1" @@ -63,7 +64,7 @@ func (a *volumeSnapshotAction) Execute(ctx ActionContext, volume map[string]inte volumeID, err := kubeutil.GetVolumeID(volume) // non-nil error means it's a supported PV source but volume ID can't be found if err != nil { - return fmt.Errorf("error getting volume ID for backup %q, PersistentVolume %q: %v", backupName, name, err) + return errors.Wrapf(err, "error getting volume ID for backup %q, PersistentVolume %q", backupName, name) } // no volumeID / nil error means unsupported PV source if volumeID == "" { diff --git a/pkg/client/client.go b/pkg/client/client.go index 10a363205..a69f59f73 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -20,6 +20,8 @@ import ( "fmt" "runtime" + "github.com/pkg/errors" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -33,7 +35,7 @@ func Config(kubeconfig, baseName string) (*rest.Config, error) { loader.ExplicitPath = kubeconfig clientConfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", loader.Load) if err != nil { - return nil, err + return nil, errors.WithStack(err) } clientConfig.UserAgent = buildUserAgent( diff --git a/pkg/client/dynamic.go b/pkg/client/dynamic.go index 5449eb3a5..7db557319 100644 --- a/pkg/client/dynamic.go +++ b/pkg/client/dynamic.go @@ -17,6 +17,8 @@ limitations under the License. package client import ( + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -51,7 +53,7 @@ func NewDynamicFactory(clientPool dynamic.ClientPool) DynamicFactory { func (f *dynamicFactory) ClientForGroupVersionResource(gvr schema.GroupVersionResource, resource metav1.APIResource, namespace string) (Dynamic, error) { dynamicClient, err := f.clientPool.ClientForGroupVersionResource(gvr) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error getting client for GroupVersionResource %s", gvr) } return &dynamicResourceClient{ @@ -62,7 +64,7 @@ func (f *dynamicFactory) ClientForGroupVersionResource(gvr schema.GroupVersionRe func (f *dynamicFactory) ClientForGroupVersionKind(gvk schema.GroupVersionKind, resource metav1.APIResource, namespace string) (Dynamic, error) { dynamicClient, err := f.clientPool.ClientForGroupVersionKind(gvk) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error getting client for GroupVersionKind %s", gvk) } return &dynamicResourceClient{ diff --git a/pkg/client/factory.go b/pkg/client/factory.go index a84a2b866..d01106b5b 100644 --- a/pkg/client/factory.go +++ b/pkg/client/factory.go @@ -17,6 +17,7 @@ limitations under the License. package client import ( + "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/heptio/ark/pkg/generated/clientset" @@ -60,7 +61,7 @@ func (f *factory) Client() (clientset.Interface, error) { arkClient, err := clientset.NewForConfig(clientConfig) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return arkClient, nil } diff --git a/pkg/cloudprovider/aws/block_storage_adapter.go b/pkg/cloudprovider/aws/block_storage_adapter.go index 052422147..8cbeb4671 100644 --- a/pkg/cloudprovider/aws/block_storage_adapter.go +++ b/pkg/cloudprovider/aws/block_storage_adapter.go @@ -17,12 +17,10 @@ limitations under the License. package aws import ( - "errors" - "fmt" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -39,11 +37,11 @@ type blockStorageAdapter struct { func getSession(config *aws.Config) (*session.Session, error) { sess, err := session.NewSession(config) if err != nil { - return nil, err + return nil, errors.WithStack(err) } if _, err := sess.Config.Credentials.Get(); err != nil { - return nil, err + return nil, errors.WithStack(err) } return sess, nil @@ -71,10 +69,10 @@ func NewBlockStorageAdapter(region, availabilityZone string) (cloudprovider.Bloc ) res, err := ec2Client.DescribeAvailabilityZones(azReq) if err != nil { - return nil, err + return nil, errors.WithStack(err) } if len(res.AvailabilityZones) == 0 { - return nil, fmt.Errorf("availability zone %q not found", availabilityZone) + return nil, errors.Errorf("availability zone %q not found", availabilityZone) } return &blockStorageAdapter{ @@ -101,7 +99,7 @@ func (op *blockStorageAdapter) CreateVolumeFromSnapshot(snapshotID, volumeType s res, err := op.ec2.CreateVolume(req) if err != nil { - return "", err + return "", errors.WithStack(err) } return *res.VolumeId, nil @@ -114,11 +112,11 @@ func (op *blockStorageAdapter) GetVolumeInfo(volumeID string) (string, *int64, e res, err := op.ec2.DescribeVolumes(req) if err != nil { - return "", nil, err + return "", nil, errors.WithStack(err) } if len(res.Volumes) != 1 { - return "", nil, fmt.Errorf("Expected one volume from DescribeVolumes for volume ID %v, got %v", volumeID, len(res.Volumes)) + return "", nil, errors.Errorf("Expected one volume from DescribeVolumes for volume ID %v, got %v", volumeID, len(res.Volumes)) } vol := res.Volumes[0] @@ -146,10 +144,10 @@ func (op *blockStorageAdapter) IsVolumeReady(volumeID string) (ready bool, err e res, err := op.ec2.DescribeVolumes(req) if err != nil { - return false, err + return false, errors.WithStack(err) } if len(res.Volumes) != 1 { - return false, fmt.Errorf("Expected one volume from DescribeVolumes for volume ID %v, got %v", volumeID, len(res.Volumes)) + return false, errors.Errorf("Expected one volume from DescribeVolumes for volume ID %v, got %v", volumeID, len(res.Volumes)) } return *res.Volumes[0].State == ec2.VolumeStateAvailable, nil @@ -175,7 +173,7 @@ func (op *blockStorageAdapter) ListSnapshots(tagFilters map[string]string) ([]st return !lastPage }) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return ret, nil @@ -188,7 +186,7 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s res, err := op.ec2.CreateSnapshot(req) if err != nil { - return "", err + return "", errors.WithStack(err) } tagsReq := &ec2.CreateTagsInput{} @@ -208,7 +206,7 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s _, err = op.ec2.CreateTags(tagsReq) - return *res.SnapshotId, err + return *res.SnapshotId, errors.WithStack(err) } func (op *blockStorageAdapter) DeleteSnapshot(snapshotID string) error { @@ -218,5 +216,5 @@ func (op *blockStorageAdapter) DeleteSnapshot(snapshotID string) error { _, err := op.ec2.DeleteSnapshot(req) - return err + return errors.WithStack(err) } diff --git a/pkg/cloudprovider/aws/object_storage_adapter.go b/pkg/cloudprovider/aws/object_storage_adapter.go index cb493daa6..3b7bfb744 100644 --- a/pkg/cloudprovider/aws/object_storage_adapter.go +++ b/pkg/cloudprovider/aws/object_storage_adapter.go @@ -17,14 +17,13 @@ limitations under the License. package aws import ( - "errors" "io" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/service/s3" - "github.com/golang/glog" + "github.com/pkg/errors" "github.com/heptio/ark/pkg/cloudprovider" ) @@ -85,7 +84,7 @@ func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Rea _, err := op.s3.PutObject(req) - return err + return errors.Wrapf(err, "error putting object %s", key) } func (op *objectStorageAdapter) GetObject(bucket string, key string) (io.ReadCloser, error) { @@ -96,7 +95,7 @@ func (op *objectStorageAdapter) GetObject(bucket string, key string) (io.ReadClo res, err := op.s3.GetObject(req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error getting object %s", key) } return res.Body, nil @@ -117,7 +116,7 @@ func (op *objectStorageAdapter) ListCommonPrefixes(bucket string, delimiter stri }) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return ret, nil @@ -138,7 +137,7 @@ func (op *objectStorageAdapter) ListObjects(bucket, prefix string) ([]string, er }) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return ret, nil @@ -152,11 +151,10 @@ func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error { _, err := op.s3.DeleteObject(req) - return err + return errors.Wrapf(err, "error deleting object %s", key) } func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { - glog.V(4).Infof("CreateSignedURL: bucket=%s, key=%s, ttl=%d", bucket, key, ttl) req, _ := op.s3.GetObjectRequest(&s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), diff --git a/pkg/cloudprovider/azure/block_storage_adapter.go b/pkg/cloudprovider/azure/block_storage_adapter.go index 62cad92fc..d22d6ec81 100644 --- a/pkg/cloudprovider/azure/block_storage_adapter.go +++ b/pkg/cloudprovider/azure/block_storage_adapter.go @@ -18,7 +18,6 @@ package azure import ( "context" - "errors" "fmt" "os" "time" @@ -28,6 +27,7 @@ import ( "github.com/Azure/azure-sdk-for-go/arm/resources/subscriptions" "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/azure" + "github.com/pkg/errors" "github.com/satori/uuid" "github.com/heptio/ark/pkg/cloudprovider" @@ -85,7 +85,7 @@ func NewBlockStorageAdapter(location string, apiTimeout time.Duration) (cloudpro spt, err := helpers.NewServicePrincipalTokenFromCredentials(cfg, azure.PublicCloud.ResourceManagerEndpoint) if err != nil { - return nil, fmt.Errorf("error creating new service principal: %v", err) + return nil, errors.Wrap(err, "error creating new service principal token") } disksClient := disk.NewDisksClient(cfg[azureSubscriptionIDKey]) @@ -101,7 +101,7 @@ func NewBlockStorageAdapter(location string, apiTimeout time.Duration) (cloudpro locs, err := groupClient.ListLocations(cfg[azureSubscriptionIDKey]) if err != nil { - return nil, err + return nil, errors.WithStack(err) } if locs.Value == nil { @@ -117,7 +117,7 @@ func NewBlockStorageAdapter(location string, apiTimeout time.Duration) (cloudpro } if !locationExists { - return nil, fmt.Errorf("location %q not found", location) + return nil, errors.Errorf("location %q not found", location) } return &blockStorageAdapter{ @@ -154,7 +154,7 @@ func (op *blockStorageAdapter) CreateVolumeFromSnapshot(snapshotID, volumeType s err := <-errChan if err != nil { - return "", err + return "", errors.WithStack(err) } return diskName, nil } @@ -162,7 +162,7 @@ func (op *blockStorageAdapter) CreateVolumeFromSnapshot(snapshotID, volumeType s func (op *blockStorageAdapter) GetVolumeInfo(volumeID string) (string, *int64, error) { res, err := op.disks.Get(op.resourceGroup, volumeID) if err != nil { - return "", nil, err + return "", nil, errors.WithStack(err) } return string(res.AccountType), nil, nil @@ -171,7 +171,7 @@ func (op *blockStorageAdapter) GetVolumeInfo(volumeID string) (string, *int64, e func (op *blockStorageAdapter) IsVolumeReady(volumeID string) (ready bool, err error) { res, err := op.disks.Get(op.resourceGroup, volumeID) if err != nil { - return false, err + return false, errors.WithStack(err) } if res.ProvisioningState == nil { @@ -184,7 +184,7 @@ func (op *blockStorageAdapter) IsVolumeReady(volumeID string) (ready bool, err e func (op *blockStorageAdapter) ListSnapshots(tagFilters map[string]string) ([]string, error) { res, err := op.snaps.ListByResourceGroup(op.resourceGroup) if err != nil { - return nil, err + return nil, errors.WithStack(err) } if res.Value == nil { @@ -252,7 +252,7 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s err := <-errChan if err != nil { - return "", err + return "", errors.WithStack(err) } return snapshotName, nil @@ -266,7 +266,7 @@ func (op *blockStorageAdapter) DeleteSnapshot(snapshotID string) error { err := <-errChan - return err + return errors.WithStack(err) } func getFullDiskName(subscription string, resourceGroup string, diskName string) string { diff --git a/pkg/cloudprovider/azure/object_storage_adapter.go b/pkg/cloudprovider/azure/object_storage_adapter.go index daaca2b71..d05663518 100644 --- a/pkg/cloudprovider/azure/object_storage_adapter.go +++ b/pkg/cloudprovider/azure/object_storage_adapter.go @@ -17,12 +17,12 @@ limitations under the License. package azure import ( - "fmt" "io" "strings" "time" "github.com/Azure/azure-sdk-for-go/storage" + "github.com/pkg/errors" "github.com/heptio/ark/pkg/cloudprovider" ) @@ -40,7 +40,7 @@ func NewObjectStorageAdapter() (cloudprovider.ObjectStorageAdapter, error) { storageClient, err := storage.NewBasicClient(cfg[azureStorageAccountIDKey], cfg[azureStorageKeyKey]) if err != nil { - return nil, err + return nil, errors.WithStack(err) } blobClient := storageClient.GetBlobService() @@ -65,16 +65,16 @@ func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Rea // length here is ugly. refactor to make this better. len, err := body.Seek(0, io.SeekEnd) if err != nil { - return err + return errors.WithStack(err) } blob.Properties.ContentLength = len if _, err := body.Seek(0, 0); err != nil { - return err + return errors.WithStack(err) } - return blob.CreateBlockBlobFromReader(body, nil) + return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil)) } func (op *objectStorageAdapter) GetObject(bucket string, key string) (io.ReadCloser, error) { @@ -90,7 +90,7 @@ func (op *objectStorageAdapter) GetObject(bucket string, key string) (io.ReadClo res, err := blob.Get(nil) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return res, nil @@ -108,7 +108,7 @@ func (op *objectStorageAdapter) ListCommonPrefixes(bucket string, delimiter stri res, err := container.ListBlobs(params) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // Azure returns prefixes inclusive of the last delimiter. We need to strip @@ -133,7 +133,7 @@ func (op *objectStorageAdapter) ListObjects(bucket, prefix string) ([]string, er res, err := container.ListBlobs(params) if err != nil { - return nil, err + return nil, errors.WithStack(err) } ret := make([]string, 0, len(res.Blobs)) @@ -155,7 +155,7 @@ func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error { return err } - return blob.Delete(nil) + return errors.WithStack(blob.Delete(nil)) } const sasURIReadPermission = "r" @@ -177,7 +177,7 @@ func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Dur func getContainerReference(blobClient *storage.BlobStorageClient, bucket string) (*storage.Container, error) { container := blobClient.GetContainerReference(bucket) if container == nil { - return nil, fmt.Errorf("unable to get container reference for bucket %v", bucket) + return nil, errors.Errorf("unable to get container reference for bucket %v", bucket) } return container, nil @@ -186,7 +186,7 @@ func getContainerReference(blobClient *storage.BlobStorageClient, bucket string) func getBlobReference(container *storage.Container, key string) (*storage.Blob, error) { blob := container.GetBlobReference(key) if blob == nil { - return nil, fmt.Errorf("unable to get blob reference for key %v", key) + return nil, errors.Errorf("unable to get blob reference for key %v", key) } return blob, nil diff --git a/pkg/cloudprovider/backup_cache.go b/pkg/cloudprovider/backup_cache.go index 493aeb4c2..bfd671e7b 100644 --- a/pkg/cloudprovider/backup_cache.go +++ b/pkg/cloudprovider/backup_cache.go @@ -21,7 +21,7 @@ import ( "sync" "time" - "github.com/golang/glog" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" @@ -41,15 +41,17 @@ type backupCache struct { // This doesn't really need to be a map right now, but if we ever move to supporting multiple // buckets, this will be ready for it. buckets map[string]*backupCacheBucket + logger *logrus.Logger } var _ BackupGetter = &backupCache{} // NewBackupCache returns a new backup cache that refreshes from delegate every resyncPeriod. -func NewBackupCache(ctx context.Context, delegate BackupGetter, resyncPeriod time.Duration) BackupGetter { +func NewBackupCache(ctx context.Context, delegate BackupGetter, resyncPeriod time.Duration, logger *logrus.Logger) BackupGetter { c := &backupCache{ delegate: delegate, buckets: make(map[string]*backupCacheBucket), + logger: logger, } // Start the goroutine to refresh all buckets every resyncPeriod. This stops when ctx.Done() is @@ -64,10 +66,10 @@ func (c *backupCache) refresh() { c.lock.Lock() defer c.lock.Unlock() - glog.V(4).Infof("refreshing all cached backup lists from object storage") + c.logger.Debug("refreshing all cached backup lists from object storage") for bucketName, bucket := range c.buckets { - glog.V(4).Infof("refreshing bucket %q", bucketName) + c.logger.WithField("bucket", bucketName).Debug("Refreshing bucket") bucket.backups, bucket.error = c.delegate.GetAllBackups(bucketName) } } @@ -76,12 +78,15 @@ func (c *backupCache) GetAllBackups(bucketName string) ([]*v1.Backup, error) { c.lock.RLock() bucket, found := c.buckets[bucketName] c.lock.RUnlock() + + logContext := c.logger.WithField("bucket", bucketName) + if found { - glog.V(4).Infof("returning cached backup list for bucket %q", bucketName) + logContext.Debug("Returning cached backup list") return bucket.backups, bucket.error } - glog.V(4).Infof("bucket %q is not in cache - doing a live lookup", bucketName) + logContext.Debug("Bucket is not in cache - doing a live lookup") backups, err := c.delegate.GetAllBackups(bucketName) c.lock.Lock() diff --git a/pkg/cloudprovider/backup_cache_test.go b/pkg/cloudprovider/backup_cache_test.go index 50fd23d34..373750d60 100644 --- a/pkg/cloudprovider/backup_cache_test.go +++ b/pkg/cloudprovider/backup_cache_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/heptio/ark/pkg/apis/ark/v1" @@ -29,12 +30,15 @@ import ( ) func TestNewBackupCache(t *testing.T) { - delegate := &test.FakeBackupService{} - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + var ( + delegate = &test.FakeBackupService{} + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + logger, _ = testlogger.NewNullLogger() + ) defer cancel() - c := NewBackupCache(ctx, delegate, 100*time.Millisecond) + c := NewBackupCache(ctx, delegate, 100*time.Millisecond, logger) // nothing in cache, live lookup bucket1 := []*v1.Backup{ @@ -99,7 +103,10 @@ func TestNewBackupCache(t *testing.T) { } func TestBackupCacheRefresh(t *testing.T) { - delegate := &test.FakeBackupService{} + var ( + delegate = &test.FakeBackupService{} + logger, _ = testlogger.NewNullLogger() + ) c := &backupCache{ delegate: delegate, @@ -107,6 +114,7 @@ func TestBackupCacheRefresh(t *testing.T) { "bucket1": {}, "bucket2": {}, }, + logger: logger, } bucket1 := []*v1.Backup{ @@ -127,12 +135,14 @@ func TestBackupCacheRefresh(t *testing.T) { } func TestBackupCacheGetAllBackupsUsesCacheIfPresent(t *testing.T) { - delegate := &test.FakeBackupService{} - - bucket1 := []*v1.Backup{ - test.NewTestBackup().WithName("backup1").Backup, - test.NewTestBackup().WithName("backup2").Backup, - } + var ( + delegate = &test.FakeBackupService{} + logger, _ = testlogger.NewNullLogger() + bucket1 = []*v1.Backup{ + test.NewTestBackup().WithName("backup1").Backup, + test.NewTestBackup().WithName("backup2").Backup, + } + ) c := &backupCache{ delegate: delegate, @@ -141,6 +151,7 @@ func TestBackupCacheGetAllBackupsUsesCacheIfPresent(t *testing.T) { backups: bucket1, }, }, + logger: logger, } bucket2 := []*v1.Backup{ diff --git a/pkg/cloudprovider/backup_service.go b/pkg/cloudprovider/backup_service.go index 521e32786..1925cff1b 100644 --- a/pkg/cloudprovider/backup_service.go +++ b/pkg/cloudprovider/backup_service.go @@ -24,10 +24,11 @@ import ( "strings" "time" - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/errors" + kerrors "k8s.io/apimachinery/pkg/util/errors" api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/generated/clientset/scheme" @@ -92,16 +93,18 @@ func getRestoreLogKey(backup, restore string) string { type backupService struct { objectStorage ObjectStorageAdapter decoder runtime.Decoder + logger *logrus.Logger } var _ BackupService = &backupService{} var _ BackupGetter = &backupService{} // NewBackupService creates a backup service using the provided object storage adapter -func NewBackupService(objectStorage ObjectStorageAdapter) BackupService { +func NewBackupService(objectStorage ObjectStorageAdapter, logger *logrus.Logger) BackupService { return &backupService{ objectStorage: objectStorage, decoder: scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion), + logger: logger, } } @@ -118,14 +121,17 @@ func (br *backupService) UploadBackup(bucket, backupName string, metadata, backu // try to delete the metadata file since the data upload failed deleteErr := br.objectStorage.DeleteObject(bucket, metadataKey) - return errors.NewAggregate([]error{err, deleteErr}) + return kerrors.NewAggregate([]error{err, deleteErr}) } // uploading log file is best-effort; if it fails, we log the error but call the overall upload a // success logKey := getBackupLogKey(backupName) if err := br.objectStorage.PutObject(bucket, logKey, log); err != nil { - glog.Errorf("error uploading %s/%s: %v", bucket, logKey, err) + br.logger.WithError(err).WithFields(logrus.Fields{ + "bucket": bucket, + "key": logKey, + }).Error("Error uploading log file") } return nil @@ -149,7 +155,7 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { for _, backupDir := range prefixes { backup, err := br.GetBackup(bucket, backupDir) if err != nil { - glog.Errorf("Error reading backup directory %s: %v", backupDir, err) + br.logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory") continue } @@ -170,17 +176,17 @@ func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) { data, err := ioutil.ReadAll(res) if err != nil { - return nil, err + return nil, errors.WithStack(err) } obj, _, err := br.decoder.Decode(data, nil, nil) if err != nil { - return nil, err + return nil, errors.WithStack(err) } backup, ok := obj.(*api.Backup) if !ok { - return nil, fmt.Errorf("unexpected type for %s/%s: %T", bucket, key, obj) + return nil, errors.Errorf("unexpected type for %s/%s: %T", bucket, key, obj) } return backup, nil @@ -194,13 +200,16 @@ func (br *backupService) DeleteBackupDir(bucket, backupName string) error { var errs []error for _, key := range objects { - glog.V(4).Infof("Trying to delete bucket=%s, key=%s", bucket, key) + br.logger.WithFields(logrus.Fields{ + "bucket": bucket, + "key": key, + }).Debug("Trying to delete object") if err := br.objectStorage.DeleteObject(bucket, key); err != nil { errs = append(errs, err) } } - return errors.NewAggregate(errs) + return errors.WithStack(kerrors.NewAggregate(errs)) } func (br *backupService) CreateSignedURL(target api.DownloadTarget, bucket string, ttl time.Duration) (string, error) { @@ -218,7 +227,7 @@ func (br *backupService) CreateSignedURL(target api.DownloadTarget, bucket strin backup := target.Name[0:i] return br.objectStorage.CreateSignedURL(bucket, getRestoreLogKey(backup, target.Name), ttl) default: - return "", fmt.Errorf("unsupported download target kind %q", target.Kind) + return "", errors.Errorf("unsupported download target kind %q", target.Kind) } } @@ -235,10 +244,15 @@ type cachedBackupService struct { // NewBackupServiceWithCachedBackupGetter returns a BackupService that uses a cache for // GetAllBackups(). -func NewBackupServiceWithCachedBackupGetter(ctx context.Context, delegate BackupService, resyncPeriod time.Duration) BackupService { +func NewBackupServiceWithCachedBackupGetter( + ctx context.Context, + delegate BackupService, + resyncPeriod time.Duration, + logger *logrus.Logger, +) BackupService { return &cachedBackupService{ BackupService: delegate, - cache: NewBackupCache(ctx, delegate, resyncPeriod), + cache: NewBackupCache(ctx, delegate, resyncPeriod, logger), } } diff --git a/pkg/cloudprovider/backup_service_test.go b/pkg/cloudprovider/backup_service_test.go index afb705d92..b7a7af5e1 100644 --- a/pkg/cloudprovider/backup_service_test.go +++ b/pkg/cloudprovider/backup_service_test.go @@ -27,6 +27,7 @@ import ( "time" testutil "github.com/heptio/ark/pkg/util/test" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -80,10 +81,12 @@ func TestUploadBackup(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - objStore := &testutil.ObjectStorageAdapter{} - - bucket := "test-bucket" - backupName := "test-backup" + var ( + objStore = &testutil.ObjectStorageAdapter{} + bucket = "test-bucket" + backupName = "test-backup" + logger, _ = testlogger.NewNullLogger() + ) if test.metadata != nil { objStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError) @@ -98,7 +101,7 @@ func TestUploadBackup(t *testing.T) { objStore.On("DeleteObject", bucket, backupName+"/ark-backup.json").Return(nil) } - backupService := NewBackupService(objStore) + backupService := NewBackupService(objStore, logger) err := backupService.UploadBackup(bucket, backupName, test.metadata, test.backup, test.log) @@ -114,11 +117,15 @@ func TestUploadBackup(t *testing.T) { } func TestDownloadBackup(t *testing.T) { - o := &testutil.ObjectStorageAdapter{} - bucket := "b" - backup := "bak" + var ( + o = &testutil.ObjectStorageAdapter{} + bucket = "b" + backup = "bak" + logger, _ = testlogger.NewNullLogger() + ) o.On("GetObject", bucket, backup+"/"+backup+".tar.gz").Return(ioutil.NopCloser(strings.NewReader("foo")), nil) - s := NewBackupService(o) + + s := NewBackupService(o, logger) rc, err := s.DownloadBackup(bucket, backup) require.NoError(t, err) require.NotNil(t, rc) @@ -147,11 +154,14 @@ func TestDeleteBackup(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - bucket := "bucket" - backup := "bak" - objects := []string{"bak/ark-backup.json", "bak/bak.tar.gz", "bak/bak.log.gz"} + var ( + bucket = "bucket" + backup = "bak" + objects = []string{"bak/ark-backup.json", "bak/bak.tar.gz", "bak/bak.log.gz"} + objStore = &testutil.ObjectStorageAdapter{} + logger, _ = testlogger.NewNullLogger() + ) - objStore := &testutil.ObjectStorageAdapter{} objStore.On("ListObjects", bucket, backup+"/").Return(objects, test.listObjectsError) for i, o := range objects { var err error @@ -162,7 +172,7 @@ func TestDeleteBackup(t *testing.T) { objStore.On("DeleteObject", bucket, o).Return(err) } - backupService := NewBackupService(objStore) + backupService := NewBackupService(objStore, logger) err := backupService.DeleteBackupDir(bucket, backup) @@ -218,14 +228,17 @@ func TestGetAllBackups(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - bucket := "bucket" + var ( + bucket = "bucket" + objStore = &testutil.ObjectStorageAdapter{} + logger, _ = testlogger.NewNullLogger() + ) - objStore := &testutil.ObjectStorageAdapter{} objStore.On("ListCommonPrefixes", bucket, "/").Return([]string{"backup-1", "backup-2"}, nil) objStore.On("GetObject", bucket, "backup-1/ark-backup.json").Return(ioutil.NopCloser(bytes.NewReader(test.storageData["backup-1/ark-backup.json"])), nil) objStore.On("GetObject", bucket, "backup-2/ark-backup.json").Return(ioutil.NopCloser(bytes.NewReader(test.storageData["backup-2/ark-backup.json"])), nil) - backupService := NewBackupService(objStore) + backupService := NewBackupService(objStore, logger) res, err := backupService.GetAllBackups(bucket) @@ -295,8 +308,11 @@ func TestCreateSignedURL(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - objectStorage := &testutil.ObjectStorageAdapter{} - backupService := NewBackupService(objectStorage) + var ( + objectStorage = &testutil.ObjectStorageAdapter{} + logger, _ = testlogger.NewNullLogger() + backupService = NewBackupService(objectStorage, logger) + ) target := api.DownloadTarget{ Kind: test.targetKind, diff --git a/pkg/cloudprovider/gcp/block_storage_adapter.go b/pkg/cloudprovider/gcp/block_storage_adapter.go index 8664e476a..b7ca50f2a 100644 --- a/pkg/cloudprovider/gcp/block_storage_adapter.go +++ b/pkg/cloudprovider/gcp/block_storage_adapter.go @@ -17,11 +17,10 @@ limitations under the License. package gcp import ( - "errors" - "fmt" "strings" "time" + "github.com/pkg/errors" uuid "github.com/satori/go.uuid" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -50,22 +49,22 @@ func NewBlockStorageAdapter(project, zone string) (cloudprovider.BlockStorageAda client, err := google.DefaultClient(oauth2.NoContext, compute.ComputeScope) if err != nil { - return nil, err + return nil, errors.WithStack(err) } gce, err := compute.New(client) if err != nil { - return nil, err + return nil, errors.WithStack(err) } // validate project & zone res, err := gce.Zones.Get(project, zone).Do() if err != nil { - return nil, err + return nil, errors.WithStack(err) } if res == nil { - return nil, fmt.Errorf("zone %q not found for project %q", project, zone) + return nil, errors.Errorf("zone %q not found for project %q", project, zone) } return &blockStorageAdapter{ @@ -78,7 +77,7 @@ func NewBlockStorageAdapter(project, zone string) (cloudprovider.BlockStorageAda func (op *blockStorageAdapter) CreateVolumeFromSnapshot(snapshotID string, volumeType string, iops *int64) (volumeID string, err error) { res, err := op.gce.Snapshots.Get(op.project, snapshotID).Do() if err != nil { - return "", err + return "", errors.WithStack(err) } disk := &compute.Disk{ @@ -88,7 +87,7 @@ func (op *blockStorageAdapter) CreateVolumeFromSnapshot(snapshotID string, volum } if _, err = op.gce.Disks.Insert(op.project, op.zone, disk).Do(); err != nil { - return "", err + return "", errors.WithStack(err) } return disk.Name, nil @@ -97,7 +96,7 @@ func (op *blockStorageAdapter) CreateVolumeFromSnapshot(snapshotID string, volum func (op *blockStorageAdapter) GetVolumeInfo(volumeID string) (string, *int64, error) { res, err := op.gce.Disks.Get(op.project, op.zone, volumeID).Do() if err != nil { - return "", nil, err + return "", nil, errors.WithStack(err) } return res.Type, nil, nil @@ -106,7 +105,7 @@ func (op *blockStorageAdapter) GetVolumeInfo(volumeID string) (string, *int64, e func (op *blockStorageAdapter) IsVolumeReady(volumeID string) (ready bool, err error) { disk, err := op.gce.Disks.Get(op.project, op.zone, volumeID).Do() if err != nil { - return false, err + return false, errors.WithStack(err) } // TODO can we consider a disk ready while it's in the RESTORING state? @@ -129,7 +128,7 @@ func (op *blockStorageAdapter) ListSnapshots(tagFilters map[string]string) ([]st res, err := op.gce.Snapshots.List(op.project).Filter(filter).Do() if err != nil { - return nil, err + return nil, errors.WithStack(err) } ret := make([]string, 0, len(res.Items)) @@ -158,7 +157,7 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s _, err := op.gce.Disks.CreateSnapshot(op.project, op.zone, volumeID, &gceSnap).Do() if err != nil { - return "", err + return "", errors.WithStack(err) } // the snapshot is not immediately available after creation for putting labels @@ -170,7 +169,7 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s } return false, nil }); pollErr != nil { - return "", err + return "", errors.WithStack(err) } labels := &compute.GlobalSetLabelsRequest{ @@ -180,7 +179,7 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s _, err = op.gce.Snapshots.SetLabels(op.project, gceSnap.Name, labels).Do() if err != nil { - return "", err + return "", errors.WithStack(err) } return gceSnap.Name, nil @@ -189,5 +188,5 @@ func (op *blockStorageAdapter) CreateSnapshot(volumeID string, tags map[string]s func (op *blockStorageAdapter) DeleteSnapshot(snapshotID string) error { _, err := op.gce.Snapshots.Delete(op.project, snapshotID).Do() - return err + return errors.WithStack(err) } diff --git a/pkg/cloudprovider/gcp/object_storage_adapter.go b/pkg/cloudprovider/gcp/object_storage_adapter.go index 85ec13978..64c629ba4 100644 --- a/pkg/cloudprovider/gcp/object_storage_adapter.go +++ b/pkg/cloudprovider/gcp/object_storage_adapter.go @@ -17,11 +17,11 @@ limitations under the License. package gcp import ( - "errors" "io" "strings" "time" + "github.com/pkg/errors" "golang.org/x/oauth2" "golang.org/x/oauth2/google" // TODO switch to using newstorage @@ -42,12 +42,12 @@ var _ cloudprovider.ObjectStorageAdapter = &objectStorageAdapter{} func NewObjectStorageAdapter(googleAccessID string, privateKey []byte) (cloudprovider.ObjectStorageAdapter, error) { client, err := google.DefaultClient(oauth2.NoContext, storage.DevstorageReadWriteScope) if err != nil { - return nil, err + return nil, errors.WithStack(err) } gcs, err := storage.New(client) if err != nil { - return nil, err + return nil, errors.WithStack(err) } return &objectStorageAdapter{ @@ -64,13 +64,13 @@ func (op *objectStorageAdapter) PutObject(bucket string, key string, body io.Rea _, err := op.gcs.Objects.Insert(bucket, obj).Media(body).Do() - return err + return errors.WithStack(err) } func (op *objectStorageAdapter) GetObject(bucket string, key string) (io.ReadCloser, error) { res, err := op.gcs.Objects.Get(bucket, key).Download() if err != nil { - return nil, err + return nil, errors.WithStack(err) } return res.Body, nil @@ -79,7 +79,7 @@ func (op *objectStorageAdapter) GetObject(bucket string, key string) (io.ReadClo func (op *objectStorageAdapter) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { res, err := op.gcs.Objects.List(bucket).Delimiter(delimiter).Do() if err != nil { - return nil, err + return nil, errors.WithStack(err) } // GCP returns prefixes inclusive of the last delimiter. We need to strip @@ -95,7 +95,7 @@ func (op *objectStorageAdapter) ListCommonPrefixes(bucket string, delimiter stri func (op *objectStorageAdapter) ListObjects(bucket, prefix string) ([]string, error) { res, err := op.gcs.Objects.List(bucket).Prefix(prefix).Do() if err != nil { - return nil, err + return nil, errors.WithStack(err) } ret := make([]string, 0, len(res.Items)) @@ -107,7 +107,7 @@ func (op *objectStorageAdapter) ListObjects(bucket, prefix string) ([]string, er } func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error { - return op.gcs.Objects.Delete(bucket, key).Do() + return errors.Wrapf(op.gcs.Objects.Delete(bucket, key).Do(), "error deleting object %s", key) } func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { diff --git a/pkg/cloudprovider/snapshot_service.go b/pkg/cloudprovider/snapshot_service.go index 34165ee01..6c3a7fc48 100644 --- a/pkg/cloudprovider/snapshot_service.go +++ b/pkg/cloudprovider/snapshot_service.go @@ -17,8 +17,9 @@ limitations under the License. package cloudprovider import ( - "fmt" "time" + + "github.com/pkg/errors" ) // SnapshotService exposes Ark-specific operations for snapshotting and restoring block @@ -82,7 +83,7 @@ func (sr *snapshotService) CreateVolumeFromSnapshot(snapshotID string, volumeTyp for { select { case <-timeout.C: - return "", fmt.Errorf("timeout reached waiting for volume %v to be ready", volumeID) + return "", errors.Errorf("timeout reached waiting for volume %v to be ready", volumeID) case <-ticker.C: if ready, err := sr.blockStorage.IsVolumeReady(volumeID); err == nil && ready { return volumeID, nil diff --git a/pkg/cmd/cli/backup/create.go b/pkg/cmd/cli/backup/create.go index e25c66526..a5656b642 100644 --- a/pkg/cmd/cli/backup/create.go +++ b/pkg/cmd/cli/backup/create.go @@ -17,10 +17,10 @@ limitations under the License. package backup import ( - "errors" "fmt" "time" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" diff --git a/pkg/cmd/cli/backup/download.go b/pkg/cmd/cli/backup/download.go index 6c4071d30..3c6cb29d0 100644 --- a/pkg/cmd/cli/backup/download.go +++ b/pkg/cmd/cli/backup/download.go @@ -17,12 +17,12 @@ limitations under the License. package backup import ( - "errors" "fmt" "os" "path/filepath" "time" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -88,7 +88,7 @@ func (o *DownloadOptions) Complete(args []string) error { if o.Output == "" { path, err := os.Getwd() if err != nil { - return fmt.Errorf("error getting current directory: %v", err) + return errors.Wrapf(err, "error getting current directory") } o.Output = filepath.Join(path, fmt.Sprintf("%s-data.tar.gz", o.Name)) } diff --git a/pkg/cmd/cli/backup/logs.go b/pkg/cmd/cli/backup/logs.go index 07ccd2d67..80c522ea1 100644 --- a/pkg/cmd/cli/backup/logs.go +++ b/pkg/cmd/cli/backup/logs.go @@ -17,10 +17,10 @@ limitations under the License. package backup import ( - "errors" "os" "time" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/heptio/ark/pkg/apis/ark/v1" diff --git a/pkg/cmd/cli/restore/create.go b/pkg/cmd/cli/restore/create.go index 4badeab55..6e98337e9 100644 --- a/pkg/cmd/cli/restore/create.go +++ b/pkg/cmd/cli/restore/create.go @@ -17,10 +17,10 @@ limitations under the License. package restore import ( - "errors" "fmt" "time" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" diff --git a/pkg/cmd/cli/restore/logs.go b/pkg/cmd/cli/restore/logs.go index 5a378119f..96ffba811 100644 --- a/pkg/cmd/cli/restore/logs.go +++ b/pkg/cmd/cli/restore/logs.go @@ -17,10 +17,10 @@ limitations under the License. package restore import ( - "errors" "os" "time" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/heptio/ark/pkg/apis/ark/v1" diff --git a/pkg/cmd/cli/schedule/create.go b/pkg/cmd/cli/schedule/create.go index 674421f3c..e0ea3801d 100644 --- a/pkg/cmd/cli/schedule/create.go +++ b/pkg/cmd/cli/schedule/create.go @@ -17,9 +17,9 @@ limitations under the License. package schedule import ( - "errors" "fmt" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 6da718e45..d1de9b1e3 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -25,10 +25,10 @@ import ( "sync" "time" - "golang.org/x/oauth2/google" - - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "golang.org/x/oauth2/google" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -66,7 +66,10 @@ func NewCommand() *cobra.Command { Short: "Run the ark server", Long: "Run the ark server", Run: func(c *cobra.Command, args []string) { - s, err := newServer(kubeconfig, fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name())) + var ( + logger = logrus.New() + s, err = newServer(kubeconfig, fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()), logger) + ) cmd.CheckError(err) cmd.CheckError(s.run()) @@ -88,9 +91,10 @@ type server struct { sharedInformerFactory informers.SharedInformerFactory ctx context.Context cancelFunc context.CancelFunc + logger *logrus.Logger } -func newServer(kubeconfig, baseName string) (*server, error) { +func newServer(kubeconfig, baseName string, logger *logrus.Logger) (*server, error) { clientConfig, err := client.Config(kubeconfig, baseName) if err != nil { return nil, err @@ -98,12 +102,12 @@ func newServer(kubeconfig, baseName string) (*server, error) { kubeClient, err := kubernetes.NewForConfig(clientConfig) if err != nil { - return nil, err + return nil, errors.WithStack(err) } arkClient, err := clientset.NewForConfig(clientConfig) if err != nil { - return nil, err + return nil, errors.WithStack(err) } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -116,6 +120,7 @@ func newServer(kubeconfig, baseName string) (*server, error) { sharedInformerFactory: informers.NewSharedInformerFactory(arkClient, 0), ctx: ctx, cancelFunc: cancelFunc, + logger: logger, } return s, nil @@ -135,10 +140,10 @@ func (s *server) run() error { // separate object, and instead apply defaults to a clone. copy, err := scheme.Scheme.DeepCopy(originalConfig) if err != nil { - return err + return errors.WithStack(err) } config := copy.(*api.Config) - applyConfigDefaults(config) + applyConfigDefaults(config, s.logger) s.watchConfig(originalConfig) @@ -158,7 +163,9 @@ func (s *server) run() error { } func (s *server) ensureArkNamespace() error { - glog.Infof("Ensuring %s namespace exists for backups", api.DefaultNamespace) + logContext := s.logger.WithField("namespace", api.DefaultNamespace) + + logContext.Info("Ensuring namespace exists for backups") defaultNamespace := v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: api.DefaultNamespace, @@ -166,16 +173,16 @@ func (s *server) ensureArkNamespace() error { } if created, err := kube.EnsureNamespaceExists(&defaultNamespace, s.kubeClient.CoreV1().Namespaces()); created { - glog.Infof("Namespace created") + logContext.Info("Namespace created") } else if err != nil { return err } - glog.Infof("Namespace already exists") + logContext.Info("Namespace already exists") return nil } func (s *server) loadConfig() (*api.Config, error) { - glog.Infof("Retrieving Ark configuration") + s.logger.Info("Retrieving Ark configuration") var ( config *api.Config err error @@ -186,12 +193,14 @@ func (s *server) loadConfig() (*api.Config, error) { break } if !apierrors.IsNotFound(err) { - glog.Errorf("error retrieving configuration: %v", err) + s.logger.WithError(err).Error("Error retrieving configuration") + } else { + s.logger.Info("Configuration not found") } - glog.Infof("Will attempt to retrieve configuration again in 5 seconds") + s.logger.Info("Will attempt to retrieve configuration again in 5 seconds") time.Sleep(5 * time.Second) } - glog.Infof("Successfully retrieved Ark configuration") + s.logger.Info("Successfully retrieved Ark configuration") return config, nil } @@ -209,7 +218,7 @@ var defaultResourcePriorities = []string{ "configmaps", } -func applyConfigDefaults(c *api.Config) { +func applyConfigDefaults(c *api.Config, logger *logrus.Logger) { if c.GCSyncPeriod.Duration == 0 { c.GCSyncPeriod.Duration = defaultGCSyncPeriod } @@ -224,9 +233,9 @@ func applyConfigDefaults(c *api.Config) { if len(c.ResourcePriorities) == 0 { c.ResourcePriorities = defaultResourcePriorities - glog.Infof("Using default resource priorities: %v", c.ResourcePriorities) + logger.WithField("priorities", c.ResourcePriorities).Info("Using default resource priorities") } else { - glog.Infof("Using resource priorities from config: %v", c.ResourcePriorities) + logger.WithField("priorities", c.ResourcePriorities).Info("Using resource priorities from config") } } @@ -236,10 +245,10 @@ func (s *server) watchConfig(config *api.Config) { s.sharedInformerFactory.Ark().V1().Configs().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { updated := newObj.(*api.Config) - glog.V(4).Infof("received updated config: %q", kube.NamespaceAndName(updated)) + s.logger.WithField("name", kube.NamespaceAndName(updated)).Debug("received updated config") if updated.Name != config.Name { - glog.V(5).Infof("config watch channel received other config %q", updated.Name) + s.logger.WithField("name", updated.Name).Debug("Config watch channel received other config") return } @@ -256,7 +265,7 @@ func (s *server) watchConfig(config *api.Config) { } if !reflect.DeepEqual(config, updated) { - glog.Infof("Detected a config change. Gracefully shutting down") + s.logger.Info("Detected a config change. Gracefully shutting down") s.cancelFunc() } }, @@ -264,23 +273,23 @@ func (s *server) watchConfig(config *api.Config) { } func (s *server) initBackupService(config *api.Config) error { - glog.Infof("Configuring cloud provider for backup service") - objectStorage, err := getObjectStorageProvider(config.BackupStorageProvider.CloudProviderConfig, "backupStorageProvider") + s.logger.Info("Configuring cloud provider for backup service") + objectStorage, err := getObjectStorageProvider(config.BackupStorageProvider.CloudProviderConfig, "backupStorageProvider", s.logger) if err != nil { return err } - s.backupService = cloudprovider.NewBackupService(objectStorage) + s.backupService = cloudprovider.NewBackupService(objectStorage, s.logger) return nil } func (s *server) initSnapshotService(config *api.Config) error { if config.PersistentVolumeProvider == nil { - glog.Infof("PersistentVolumeProvider config not provided, volume snapshots and restores are disabled") + s.logger.Info("PersistentVolumeProvider config not provided, volume snapshots and restores are disabled") return nil } - glog.Infof("Configuring cloud provider for snapshot service") + s.logger.Info("Configuring cloud provider for snapshot service") blockStorage, err := getBlockStorageProvider(*config.PersistentVolumeProvider, "persistentVolumeProvider") if err != nil { return err @@ -313,14 +322,14 @@ func hasOneCloudProvider(cloudConfig api.CloudProviderConfig) bool { return found } -func getObjectStorageProvider(cloudConfig api.CloudProviderConfig, field string) (cloudprovider.ObjectStorageAdapter, error) { +func getObjectStorageProvider(cloudConfig api.CloudProviderConfig, field string, logger *logrus.Logger) (cloudprovider.ObjectStorageAdapter, error) { var ( objectStorage cloudprovider.ObjectStorageAdapter err error ) if !hasOneCloudProvider(cloudConfig) { - return nil, fmt.Errorf("you must specify exactly one of aws, gcp, or azure for %s", field) + return nil, errors.Errorf("you must specify exactly one of aws, gcp, or azure for %s", field) } switch { @@ -339,16 +348,16 @@ func getObjectStorageProvider(cloudConfig api.CloudProviderConfig, field string) // Get the email and private key from the credentials file so we can pre-sign download URLs creds, err := ioutil.ReadFile(credentialsFile) if err != nil { - return nil, err + return nil, errors.WithStack(err) } jwtConfig, err := google.JWTConfigFromJSON(creds) if err != nil { - return nil, err + return nil, errors.WithStack(err) } email = jwtConfig.Email privateKey = jwtConfig.PrivateKey } else { - glog.Warning("GOOGLE_APPLICATION_CREDENTIALS is undefined; some features such as downloading log files will not work") + logger.Warning("GOOGLE_APPLICATION_CREDENTIALS is undefined; some features such as downloading log files will not work") } objectStorage, err = gcp.NewObjectStorageAdapter(email, privateKey) @@ -370,7 +379,7 @@ func getBlockStorageProvider(cloudConfig api.CloudProviderConfig, field string) ) if !hasOneCloudProvider(cloudConfig) { - return nil, fmt.Errorf("you must specify exactly one of aws, gcp, or azure for %s", field) + return nil, errors.Errorf("you must specify exactly one of aws, gcp, or azure for %s", field) } switch { @@ -397,17 +406,18 @@ func durationMin(a, b time.Duration) time.Duration { } func (s *server) runControllers(config *api.Config) error { - glog.Infof("Starting controllers") + s.logger.Info("Starting controllers") ctx := s.ctx var wg sync.WaitGroup cloudBackupCacheResyncPeriod := durationMin(config.GCSyncPeriod.Duration, config.BackupSyncPeriod.Duration) - glog.Infof("Caching cloud backups every %s", cloudBackupCacheResyncPeriod) + s.logger.Infof("Caching cloud backups every %s", cloudBackupCacheResyncPeriod) s.backupService = cloudprovider.NewBackupServiceWithCachedBackupGetter( ctx, s.backupService, cloudBackupCacheResyncPeriod, + s.logger, ) backupSyncController := controller.NewBackupSyncController( @@ -415,6 +425,7 @@ func (s *server) runControllers(config *api.Config) error { s.backupService, config.BackupStorageProvider.Bucket, config.BackupSyncPeriod.Duration, + s.logger, ) wg.Add(1) go func() { @@ -422,14 +433,14 @@ func (s *server) runControllers(config *api.Config) error { wg.Done() }() - discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient) + discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient, s.logger) if err != nil { return err } go wait.Until( func() { if err := discoveryHelper.Refresh(); err != nil { - glog.Errorf("error refreshing discovery: %v", err) + s.logger.WithError(err).Error("Error refreshing discovery") } }, 5*time.Minute, @@ -437,7 +448,7 @@ func (s *server) runControllers(config *api.Config) error { ) if config.RestoreOnlyMode { - glog.Infof("Restore only mode - not starting the backup, schedule or GC controllers") + s.logger.Info("Restore only mode - not starting the backup, schedule or GC controllers") } else { backupper, err := newBackupper(discoveryHelper, s.clientPool, s.backupService, s.snapshotService) cmd.CheckError(err) @@ -448,6 +459,7 @@ func (s *server) runControllers(config *api.Config) error { s.backupService, config.BackupStorageProvider.Bucket, s.snapshotService != nil, + s.logger, ) wg.Add(1) go func() { @@ -460,6 +472,7 @@ func (s *server) runControllers(config *api.Config) error { s.arkClient.ArkV1(), s.sharedInformerFactory.Ark().V1().Schedules(), config.ScheduleSyncPeriod.Duration, + s.logger, ) wg.Add(1) go func() { @@ -476,6 +489,7 @@ func (s *server) runControllers(config *api.Config) error { s.arkClient.ArkV1(), s.sharedInformerFactory.Ark().V1().Restores(), s.arkClient.ArkV1(), + s.logger, ) wg.Add(1) go func() { @@ -492,6 +506,7 @@ func (s *server) runControllers(config *api.Config) error { config.ResourcePriorities, s.arkClient.ArkV1(), s.kubeClient, + s.logger, ) cmd.CheckError(err) @@ -504,6 +519,7 @@ func (s *server) runControllers(config *api.Config) error { config.BackupStorageProvider.Bucket, s.sharedInformerFactory.Ark().V1().Backups(), s.snapshotService != nil, + s.logger, ) wg.Add(1) go func() { @@ -516,6 +532,7 @@ func (s *server) runControllers(config *api.Config) error { s.sharedInformerFactory.Ark().V1().DownloadRequests(), s.backupService, config.BackupStorageProvider.Bucket, + s.logger, ) wg.Add(1) go func() { @@ -526,11 +543,11 @@ func (s *server) runControllers(config *api.Config) error { // SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS go s.sharedInformerFactory.Start(ctx.Done()) - glog.Infof("Server started successfully") + s.logger.Info("Server started successfully") <-ctx.Done() - glog.Info("Waiting for all controllers to shut down gracefully") + s.logger.Info("Waiting for all controllers to shut down gracefully") wg.Wait() return nil @@ -568,14 +585,15 @@ func newRestorer( resourcePriorities []string, backupClient arkv1client.BackupsGetter, kubeClient kubernetes.Interface, + logger *logrus.Logger, ) (restore.Restorer, error) { restorers := map[string]restorers.ResourceRestorer{ "persistentvolumes": restorers.NewPersistentVolumeRestorer(snapshotService), "persistentvolumeclaims": restorers.NewPersistentVolumeClaimRestorer(), "services": restorers.NewServiceRestorer(), "namespaces": restorers.NewNamespaceRestorer(), - "pods": restorers.NewPodRestorer(), - "jobs": restorers.NewJobRestorer(), + "pods": restorers.NewPodRestorer(logger), + "jobs": restorers.NewJobRestorer(logger), } return restore.NewKubernetesRestorer( @@ -586,5 +604,6 @@ func newRestorer( resourcePriorities, backupClient, kubeClient.CoreV1().Namespaces(), + logger, ) } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 48a96995b..c63dcf41f 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -20,16 +20,20 @@ import ( "testing" "time" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/heptio/ark/pkg/apis/ark/v1" ) func TestApplyConfigDefaults(t *testing.T) { - c := &v1.Config{} + var ( + logger, _ = test.NewNullLogger() + c = &v1.Config{} + ) // test defaulting - applyConfigDefaults(c) + applyConfigDefaults(c, logger) assert.Equal(t, defaultGCSyncPeriod, c.GCSyncPeriod.Duration) assert.Equal(t, defaultBackupSyncPeriod, c.BackupSyncPeriod.Duration) assert.Equal(t, defaultScheduleSyncPeriod, c.ScheduleSyncPeriod.Duration) @@ -41,8 +45,7 @@ func TestApplyConfigDefaults(t *testing.T) { c.ScheduleSyncPeriod.Duration = 3 * time.Minute c.ResourcePriorities = []string{"a", "b"} - applyConfigDefaults(c) - + applyConfigDefaults(c, logger) assert.Equal(t, 5*time.Minute, c.GCSyncPeriod.Duration) assert.Equal(t, 4*time.Minute, c.BackupSyncPeriod.Duration) assert.Equal(t, 3*time.Minute, c.ScheduleSyncPeriod.Duration) diff --git a/pkg/cmd/util/downloadrequest/downloadrequest.go b/pkg/cmd/util/downloadrequest/downloadrequest.go index e67edd0cd..3df068d90 100644 --- a/pkg/cmd/util/downloadrequest/downloadrequest.go +++ b/pkg/cmd/util/downloadrequest/downloadrequest.go @@ -18,13 +18,14 @@ package downloadrequest import ( "compress/gzip" - "errors" "fmt" "io" "io/ioutil" "net/http" "time" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" @@ -48,7 +49,7 @@ func Stream(client arkclientv1.DownloadRequestsGetter, name string, kind v1.Down req, err := client.DownloadRequests(v1.DefaultNamespace).Create(req) if err != nil { - return err + return errors.WithStack(err) } defer client.DownloadRequests(v1.DefaultNamespace).Delete(req.Name, nil) @@ -59,7 +60,7 @@ func Stream(client arkclientv1.DownloadRequestsGetter, name string, kind v1.Down } watcher, err := client.DownloadRequests(v1.DefaultNamespace).Watch(listOptions) if err != nil { - return err + return errors.WithStack(err) } defer watcher.Stop() @@ -74,7 +75,7 @@ Loop: case e := <-watcher.ResultChan(): updated, ok := e.Object.(*v1.DownloadRequest) if !ok { - return fmt.Errorf("unexpected type %T", e.Object) + return errors.Errorf("unexpected type %T", e.Object) } if updated.Name != req.Name { @@ -118,10 +119,10 @@ Loop: if resp.StatusCode != http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("request failed; unable to decode response body: %v", err) + return errors.Wrapf(err, "request failed; unable to decode response body") } - return fmt.Errorf("request failed: %v", string(body)) + return errors.Errorf("request failed: %v", string(body)) } reader := resp.Body diff --git a/pkg/cmd/util/flag/enum.go b/pkg/cmd/util/flag/enum.go index 841097e2b..2c02ef248 100644 --- a/pkg/cmd/util/flag/enum.go +++ b/pkg/cmd/util/flag/enum.go @@ -17,7 +17,7 @@ limitations under the License. package flag import ( - "fmt" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" ) @@ -51,7 +51,7 @@ func (e *Enum) String() string { // is not an allowed value. func (e *Enum) Set(s string) error { if !e.allowedValues.Has(s) { - return fmt.Errorf("invalid value: %q", s) + return errors.Errorf("invalid value: %q", s) } e.value = s diff --git a/pkg/cmd/util/flag/map.go b/pkg/cmd/util/flag/map.go index d5bced9cb..8f599d6f7 100644 --- a/pkg/cmd/util/flag/map.go +++ b/pkg/cmd/util/flag/map.go @@ -19,6 +19,8 @@ package flag import ( "fmt" "strings" + + "github.com/pkg/errors" ) // Map is a Cobra-compatible wrapper for defining a flag containing @@ -73,7 +75,7 @@ func (m *Map) Set(s string) error { for _, part := range strings.Split(s, m.entryDelimiter) { kvs := strings.SplitN(part, m.keyValueDelimiter, 2) if len(kvs) != 2 { - return fmt.Errorf("error parsing %q", part) + return errors.Errorf("error parsing %q", part) } m.data[kvs[0]] = kvs[1] } diff --git a/pkg/cmd/util/output/output.go b/pkg/cmd/util/output/output.go index 1031e32ad..fc6a917bb 100644 --- a/pkg/cmd/util/output/output.go +++ b/pkg/cmd/util/output/output.go @@ -20,6 +20,7 @@ import ( "fmt" "os" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -85,7 +86,7 @@ func validateOutputFlag(cmd *cobra.Command) error { switch output { case "", "table", "json", "yaml": default: - return fmt.Errorf("invalid output format %q - valid values are 'table', 'json', and 'yaml'", output) + return errors.Errorf("invalid output format %q - valid values are 'table', 'json', and 'yaml'", output) } return nil } @@ -105,7 +106,7 @@ func PrintWithFormat(c *cobra.Command, obj runtime.Object) (bool, error) { return printEncoded(obj, format) } - return false, fmt.Errorf("unsupported output format %q; valid values are 'table', 'json', and 'yaml'", format) + return false, errors.Errorf("unsupported output format %q; valid values are 'table', 'json', and 'yaml'", format) } func printEncoded(obj runtime.Object, format string) (bool, error) { diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 95221368b..58e22fb22 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -19,14 +19,14 @@ package controller import ( "bytes" "context" - "errors" "fmt" "io/ioutil" "os" "sync" "time" - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/clock" @@ -44,6 +44,7 @@ import ( listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" "github.com/heptio/ark/pkg/util/collections" "github.com/heptio/ark/pkg/util/encode" + kubeutil "github.com/heptio/ark/pkg/util/kube" ) const backupVersion = 1 @@ -53,14 +54,13 @@ type backupController struct { backupService cloudprovider.BackupService bucket string pvProviderExists bool - - lister listers.BackupLister - listerSynced cache.InformerSynced - client arkv1client.BackupsGetter - syncHandler func(backupName string) error - queue workqueue.RateLimitingInterface - - clock clock.Clock + lister listers.BackupLister + listerSynced cache.InformerSynced + client arkv1client.BackupsGetter + syncHandler func(backupName string) error + queue workqueue.RateLimitingInterface + clock clock.Clock + logger *logrus.Logger } func NewBackupController( @@ -70,19 +70,19 @@ func NewBackupController( backupService cloudprovider.BackupService, bucket string, pvProviderExists bool, + logger *logrus.Logger, ) Interface { c := &backupController{ backupper: backupper, backupService: backupService, bucket: bucket, pvProviderExists: pvProviderExists, - - lister: backupInformer.Lister(), - listerSynced: backupInformer.Informer().HasSynced, - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), - - clock: &clock.RealClock{}, + lister: backupInformer.Lister(), + listerSynced: backupInformer.Informer().HasSynced, + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), + clock: &clock.RealClock{}, + logger: logger, } c.syncHandler = c.processBackup @@ -96,13 +96,16 @@ func NewBackupController( case "", api.BackupPhaseNew: // only process new backups default: - glog.V(4).Infof("Backup %s/%s has phase %s - skipping", backup.Namespace, backup.Name, backup.Status.Phase) + c.logger.WithFields(logrus.Fields{ + "backup": kubeutil.NamespaceAndName(backup), + "phase": backup.Status.Phase, + }).Debug("Backup is not new, skipping") return } key, err := cache.MetaNamespaceKeyFunc(backup) if err != nil { - glog.Errorf("error creating queue key for %#v: %v", backup, err) + c.logger.WithError(err).WithField("backup", backup).Error("Error creating queue key, item not added to queue") return } c.queue.Add(key) @@ -120,7 +123,7 @@ func (controller *backupController) Run(ctx context.Context, numWorkers int) err var wg sync.WaitGroup defer func() { - glog.Infof("Waiting for workers to finish their work") + controller.logger.Info("Waiting for workers to finish their work") controller.queue.ShutDown() @@ -129,17 +132,18 @@ func (controller *backupController) Run(ctx context.Context, numWorkers int) err // we want to shut down the queue via defer and not at the end of the body. wg.Wait() - glog.Infof("All workers have finished") + controller.logger.Info("All workers have finished") + }() - glog.Info("Starting BackupController") - defer glog.Infof("Shutting down BackupController") + controller.logger.Info("Starting BackupController") + defer controller.logger.Info("Shutting down BackupController") - glog.Info("Waiting for caches to sync") + controller.logger.Info("Waiting for caches to sync") if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced) { return errors.New("timed out waiting for caches to sync") } - glog.Info("Caches are synced") + controller.logger.Info("Caches are synced") wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { @@ -178,7 +182,7 @@ func (controller *backupController) processNextWorkItem() bool { return true } - glog.Errorf("syncHandler error: %v", err) + controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") // we had an error processing the item so add it back // into the queue for re-processing with rate-limiting controller.queue.AddRateLimited(key) @@ -187,18 +191,18 @@ func (controller *backupController) processNextWorkItem() bool { } func (controller *backupController) processBackup(key string) error { - glog.V(4).Infof("processBackup for key %q", key) + logContext := controller.logger.WithField("key", key) + + logContext.Debug("Running processBackup") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.V(4).Infof("error splitting key %q: %v", key, err) - return err + return errors.Wrap(err, "error splitting queue key") } - glog.V(4).Infof("Getting backup %s", key) + logContext.Debug("Getting backup") backup, err := controller.lister.Backups(ns).Get(name) if err != nil { - glog.V(4).Infof("error getting backup %s: %v", key, err) - return err + return errors.Wrap(err, "error getting backup") } // TODO I think this is now unnecessary. We only initially place @@ -215,11 +219,10 @@ func (controller *backupController) processBackup(key string) error { return nil } - glog.V(4).Infof("Cloning backup %s", key) + logContext.Debug("Cloning backup") // don't modify items in the cache backup, err = cloneBackup(backup) if err != nil { - glog.V(4).Infof("error cloning backup %s: %v", key, err) return err } @@ -251,8 +254,7 @@ func (controller *backupController) processBackup(key string) error { // update status updatedBackup, err := controller.client.Backups(ns).Update(backup) if err != nil { - glog.V(4).Infof("error updating status to %s: %v", backup.Status.Phase, err) - return err + return errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase) } backup = updatedBackup @@ -260,16 +262,16 @@ func (controller *backupController) processBackup(key string) error { return nil } - glog.V(4).Infof("running backup for %s", key) + logContext.Debug("Running backup") // execution & upload of backup if err := controller.runBackup(backup, controller.bucket); err != nil { - glog.V(4).Infof("backup %s failed: %v", key, err) + logContext.WithError(err).Error("backup failed") backup.Status.Phase = api.BackupPhaseFailed } - glog.V(4).Infof("updating backup %s final status", key) + logContext.Debug("Updating backup's final status") if _, err = controller.client.Backups(ns).Update(backup); err != nil { - glog.V(4).Infof("error updating backup %s final status: %v", key, err) + logContext.WithError(err).Error("error updating backup's final status") } return nil @@ -278,12 +280,12 @@ func (controller *backupController) processBackup(key string) error { func cloneBackup(in interface{}) (*api.Backup, error) { clone, err := scheme.Scheme.DeepCopy(in) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error deep-copying Backup") } out, ok := clone.(*api.Backup) if !ok { - return nil, fmt.Errorf("unexpected type: %T", clone) + return nil, errors.Errorf("unexpected type: %T", clone) } return out, nil @@ -310,32 +312,33 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin func (controller *backupController) runBackup(backup *api.Backup, bucket string) error { backupFile, err := ioutil.TempFile("", "") if err != nil { - return err + return errors.Wrap(err, "error creating temp file for Backup") } logFile, err := ioutil.TempFile("", "") if err != nil { - return err + return errors.Wrap(err, "error creating temp file for Backup log") } defer func() { var errs []error + // TODO should this be wrapped? errs = append(errs, err) if err := backupFile.Close(); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrap(err, "error closing Backup temp file")) } if err := os.Remove(backupFile.Name()); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrap(err, "error removing Backup temp file")) } if err := logFile.Close(); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrap(err, "error closing Backup log temp file")) } if err := os.Remove(logFile.Name()); err != nil { - errs = append(errs, err) + errs = append(errs, errors.Wrap(err, "error removing Backup log temp file")) } err = kuberrs.NewAggregate(errs) @@ -344,23 +347,23 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) if err := controller.backupper.Backup(backup, backupFile, logFile); err != nil { return err } + controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed") // note: updating this here so the uploaded JSON shows "completed". If // the upload fails, we'll alter the phase in the calling func. - glog.V(4).Infof("backup %s/%s completed", backup.Namespace, backup.Name) backup.Status.Phase = api.BackupPhaseCompleted buf := new(bytes.Buffer) if err := encode.EncodeTo(backup, "json", buf); err != nil { - return err + return errors.Wrap(err, "error encoding Backup") } // re-set the files' offset to 0 for reading if _, err = backupFile.Seek(0, 0); err != nil { - return err + return errors.Wrap(err, "error resetting Backup file offset") } if _, err = logFile.Seek(0, 0); err != nil { - return err + return errors.Wrap(err, "error resetting Backup log file offset") } return controller.backupService.UploadBackup(bucket, backup.Name, bytes.NewReader(buf.Bytes()), backupFile, logFile) diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 938e0e07b..5bdd228e8 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" core "k8s.io/client-go/testing" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -146,18 +147,15 @@ func TestProcessBackup(t *testing.T) { }, } - // flag.Set("logtostderr", "true") - // flag.Set("v", "4") - for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset() - - backupper := &fakeBackupper{} - - cloudBackups := &BackupService{} - - sharedInformers := informers.NewSharedInformerFactory(client, 0) + var ( + client = fake.NewSimpleClientset() + backupper = &fakeBackupper{} + cloudBackups = &BackupService{} + sharedInformers = informers.NewSharedInformerFactory(client, 0) + logger, _ = testlogger.NewNullLogger() + ) c := NewBackupController( sharedInformers.Ark().V1().Backups(), @@ -166,6 +164,7 @@ func TestProcessBackup(t *testing.T) { cloudBackups, "bucket", test.allowSnapshots, + logger, ).(*backupController) c.clock = clock.NewFakeClock(time.Now()) diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index 042db1bc5..7f193196e 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -20,13 +20,15 @@ import ( "context" "time" - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" + kuberrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" "github.com/heptio/ark/pkg/cloudprovider" arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1" + "github.com/heptio/ark/pkg/util/kube" ) type backupSyncController struct { @@ -34,11 +36,18 @@ type backupSyncController struct { backupService cloudprovider.BackupService bucket string syncPeriod time.Duration + logger *logrus.Logger } -func NewBackupSyncController(client arkv1client.BackupsGetter, backupService cloudprovider.BackupService, bucket string, syncPeriod time.Duration) Interface { +func NewBackupSyncController( + client arkv1client.BackupsGetter, + backupService cloudprovider.BackupService, + bucket string, + syncPeriod time.Duration, + logger *logrus.Logger, +) Interface { if syncPeriod < time.Minute { - glog.Infof("Backup sync period %v is too short. Setting to 1 minute", syncPeriod) + logger.Infof("Provided backup sync period %v is too short. Setting to 1 minute", syncPeriod) syncPeriod = time.Minute } return &backupSyncController{ @@ -46,6 +55,7 @@ func NewBackupSyncController(client arkv1client.BackupsGetter, backupService clo backupService: backupService, bucket: bucket, syncPeriod: syncPeriod, + logger: logger, } } @@ -53,25 +63,27 @@ func NewBackupSyncController(client arkv1client.BackupsGetter, backupService clo // sync process according to the controller's syncPeriod. It will return when it // receives on the ctx.Done() channel. func (c *backupSyncController) Run(ctx context.Context, workers int) error { - glog.Info("Running backup sync controller") + c.logger.Info("Running backup sync controller") wait.Until(c.run, c.syncPeriod, ctx.Done()) return nil } func (c *backupSyncController) run() { - glog.Info("Syncing backups from object storage") + c.logger.Info("Syncing backups from object storage") backups, err := c.backupService.GetAllBackups(c.bucket) if err != nil { - glog.Errorf("error listing backups: %v", err) + c.logger.WithError(err).Error("error listing backups") return } - glog.Infof("Found %d backups", len(backups)) + c.logger.WithField("backupCount", len(backups)).Info("Got backups from object storage") for _, cloudBackup := range backups { - glog.Infof("Syncing backup %s/%s", cloudBackup.Namespace, cloudBackup.Name) + logContext := c.logger.WithField("backup", kube.NamespaceAndName(cloudBackup)) + logContext.Info("Syncing backup") + cloudBackup.ResourceVersion = "" - if _, err := c.client.Backups(cloudBackup.Namespace).Create(cloudBackup); err != nil && !errors.IsAlreadyExists(err) { - glog.Errorf("error syncing backup %s/%s from object storage: %v", cloudBackup.Namespace, cloudBackup.Name, err) + if _, err := c.client.Backups(cloudBackup.Namespace).Create(cloudBackup); err != nil && !kuberrs.IsAlreadyExists(err) { + logContext.WithError(errors.WithStack(err)).Error("Error syncing backup from object storage") } } } diff --git a/pkg/controller/backup_sync_controller_test.go b/pkg/controller/backup_sync_controller_test.go index f75c694a1..c956ca0af 100644 --- a/pkg/controller/backup_sync_controller_test.go +++ b/pkg/controller/backup_sync_controller_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" core "k8s.io/client-go/testing" @@ -55,14 +56,18 @@ func TestBackupSyncControllerRun(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - bs := &BackupService{} - client := fake.NewSimpleClientset() + var ( + bs = &BackupService{} + client = fake.NewSimpleClientset() + logger, _ = testlogger.NewNullLogger() + ) c := NewBackupSyncController( client.ArkV1(), bs, "bucket", time.Duration(0), + logger, ).(*backupSyncController) bs.On("GetAllBackups", "bucket").Return(test.cloudBackups, test.getAllBackupsError) diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go index 501782b64..be553c7cb 100644 --- a/pkg/controller/download_request_controller.go +++ b/pkg/controller/download_request_controller.go @@ -18,18 +18,16 @@ package controller import ( "context" - "errors" - "fmt" "sync" "time" - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -40,20 +38,19 @@ import ( arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/util/kube" ) type downloadRequestController struct { downloadRequestClient arkv1client.DownloadRequestsGetter downloadRequestLister listers.DownloadRequestLister downloadRequestListerSynced cache.InformerSynced - - backupService cloudprovider.BackupService - bucket string - - syncHandler func(key string) error - queue workqueue.RateLimitingInterface - - clock clock.Clock + backupService cloudprovider.BackupService + bucket string + syncHandler func(key string) error + queue workqueue.RateLimitingInterface + clock clock.Clock + logger *logrus.Logger } // NewDownloadRequestController creates a new DownloadRequestController. @@ -62,18 +59,17 @@ func NewDownloadRequestController( downloadRequestInformer informers.DownloadRequestInformer, backupService cloudprovider.BackupService, bucket string, + logger *logrus.Logger, ) Interface { c := &downloadRequestController{ downloadRequestClient: downloadRequestClient, downloadRequestLister: downloadRequestInformer.Lister(), downloadRequestListerSynced: downloadRequestInformer.Informer().HasSynced, - - backupService: backupService, - bucket: bucket, - - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "downloadrequest"), - - clock: &clock.RealClock{}, + backupService: backupService, + bucket: bucket, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "downloadrequest"), + clock: &clock.RealClock{}, + logger: logger, } c.syncHandler = c.processDownloadRequest @@ -83,7 +79,10 @@ func NewDownloadRequestController( AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - runtime.HandleError(fmt.Errorf("error creating queue key for %#v: %v", obj, err)) + downloadRequest := obj.(*v1.DownloadRequest) + c.logger.WithError(errors.WithStack(err)). + WithField("downloadRequest", downloadRequest.Name). + Error("Error creating queue key, item not added to queue") return } c.queue.Add(key) @@ -101,7 +100,7 @@ func (c *downloadRequestController) Run(ctx context.Context, numWorkers int) err var wg sync.WaitGroup defer func() { - glog.Infof("Waiting for workers to finish their work") + c.logger.Info("Waiting for workers to finish their work") c.queue.ShutDown() @@ -110,17 +109,17 @@ func (c *downloadRequestController) Run(ctx context.Context, numWorkers int) err // we want to shut down the queue via defer and not at the end of the body. wg.Wait() - glog.Infof("All workers have finished") + c.logger.Info("All workers have finished") }() - glog.Info("Starting DownloadRequestController") - defer glog.Infof("Shutting down DownloadRequestController") + c.logger.Info("Starting DownloadRequestController") + defer c.logger.Info("Shutting down DownloadRequestController") - glog.Info("Waiting for caches to sync") + c.logger.Info("Waiting for caches to sync") if !cache.WaitForCacheSync(ctx.Done(), c.downloadRequestListerSynced) { return errors.New("timed out waiting for caches to sync") } - glog.Info("Caches are synced") + c.logger.Info("Caches are synced") wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { @@ -167,7 +166,8 @@ func (c *downloadRequestController) processNextWorkItem() bool { return true } - glog.Errorf("syncHandler error: %v", err) + c.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") + // we had an error processing the item so add it back // into the queue for re-processing with rate-limiting c.queue.AddRateLimited(key) @@ -178,21 +178,21 @@ func (c *downloadRequestController) processNextWorkItem() bool { // processDownloadRequest is the default per-item sync handler. It generates a pre-signed URL for // a new DownloadRequest or deletes the DownloadRequest if it has expired. func (c *downloadRequestController) processDownloadRequest(key string) error { - glog.V(4).Infof("processDownloadRequest for key %q", key) + logContext := c.logger.WithField("key", key) + + logContext.Debug("Running processDownloadRequest") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.V(4).Infof("error splitting key %q: %v", key, err) - return err + return errors.Wrap(err, "error splitting queue key") } downloadRequest, err := c.downloadRequestLister.DownloadRequests(ns).Get(name) if apierrors.IsNotFound(err) { - glog.V(4).Infof("unable to find DownloadRequest %q", key) + logContext.Debug("Unable to find DownloadRequest") return nil } if err != nil { - glog.V(4).Infof("error getting DownloadRequest %q: %v", key, err) - return err + return errors.Wrap(err, "error getting DownloadRequest") } switch downloadRequest.Status.Phase { @@ -224,20 +224,20 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(signedURLTTL)) _, err = c.downloadRequestClient.DownloadRequests(update.Namespace).Update(update) - return err - + return errors.WithStack(err) } // deleteIfExpired deletes downloadRequest if it has expired. func (c *downloadRequestController) deleteIfExpired(downloadRequest *v1.DownloadRequest) error { - glog.V(4).Infof("checking for expiration of %s/%s", downloadRequest.Namespace, downloadRequest.Name) + logContext := c.logger.WithField("key", kube.NamespaceAndName(downloadRequest)) + logContext.Info("checking for expiration of DownloadRequest") if downloadRequest.Status.Expiration.Time.Before(c.clock.Now()) { - glog.V(4).Infof("%s/%s has not expired", downloadRequest.Namespace, downloadRequest.Name) + logContext.Debug("DownloadRequest has not expired") return nil } - glog.V(4).Infof("%s/%s has expired - deleting", downloadRequest.Namespace, downloadRequest.Name) - return c.downloadRequestClient.DownloadRequests(downloadRequest.Namespace).Delete(downloadRequest.Name, nil) + logContext.Debug("DownloadRequest has expired - deleting") + return errors.WithStack(c.downloadRequestClient.DownloadRequests(downloadRequest.Namespace).Delete(downloadRequest.Name, nil)) } // resync requeues all the DownloadRequests in the lister's cache. This is mostly to handle deleting @@ -245,14 +245,14 @@ func (c *downloadRequestController) deleteIfExpired(downloadRequest *v1.Download func (c *downloadRequestController) resync() { list, err := c.downloadRequestLister.List(labels.Everything()) if err != nil { - runtime.HandleError(fmt.Errorf("error listing download requests: %v", err)) + c.logger.WithError(errors.WithStack(err)).Error("error listing download requests") return } for _, dr := range list { key, err := cache.MetaNamespaceKeyFunc(dr) if err != nil { - runtime.HandleError(fmt.Errorf("error generating key for download request %#v: %v", dr, err)) + c.logger.WithError(errors.WithStack(err)).WithField("downloadRequest", dr.Name).Error("error generating key for download request") continue } @@ -264,12 +264,12 @@ func (c *downloadRequestController) resync() { func cloneDownloadRequest(in *v1.DownloadRequest) (*v1.DownloadRequest, error) { clone, err := scheme.Scheme.DeepCopy(in) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error deep-copying DownloadRequest") } out, ok := clone.(*v1.DownloadRequest) if !ok { - return nil, fmt.Errorf("unexpected type: %T", clone) + return nil, errors.Errorf("unexpected type: %T", clone) } return out, nil diff --git a/pkg/controller/download_request_controller_test.go b/pkg/controller/download_request_controller_test.go index 355ccd874..6c2c96123 100644 --- a/pkg/controller/download_request_controller_test.go +++ b/pkg/controller/download_request_controller_test.go @@ -20,6 +20,10 @@ import ( "testing" "time" + testlogger "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" core "k8s.io/client-go/testing" @@ -28,8 +32,6 @@ import ( "github.com/heptio/ark/pkg/generated/clientset/fake" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" "github.com/heptio/ark/pkg/util/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestProcessDownloadRequest(t *testing.T) { @@ -50,7 +52,7 @@ func TestProcessDownloadRequest(t *testing.T) { { name: "bad key format", key: "a/b/c", - expectedError: `unexpected key format: "a/b/c"`, + expectedError: `error splitting queue key: unexpected key format: "a/b/c"`, }, { name: "backup log request with phase '' gets a url", @@ -92,10 +94,13 @@ func TestProcessDownloadRequest(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - client := fake.NewSimpleClientset() - sharedInformers := informers.NewSharedInformerFactory(client, 0) - downloadRequestsInformer := sharedInformers.Ark().V1().DownloadRequests() - backupService := &test.BackupService{} + var ( + client = fake.NewSimpleClientset() + sharedInformers = informers.NewSharedInformerFactory(client, 0) + downloadRequestsInformer = sharedInformers.Ark().V1().DownloadRequests() + backupService = &test.BackupService{} + logger, _ = testlogger.NewNullLogger() + ) defer backupService.AssertExpectations(t) c := NewDownloadRequestController( @@ -103,6 +108,7 @@ func TestProcessDownloadRequest(t *testing.T) { downloadRequestsInformer, backupService, "bucket", + logger, ).(*downloadRequestController) if tc.expectedPhase == v1.DownloadRequestPhaseProcessed { diff --git a/pkg/controller/gc_controller.go b/pkg/controller/gc_controller.go index b9870a741..f4e8dfd41 100644 --- a/pkg/controller/gc_controller.go +++ b/pkg/controller/gc_controller.go @@ -18,10 +18,10 @@ package controller import ( "context" - "errors" "time" - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -50,6 +50,7 @@ type gcController struct { restoreLister listers.RestoreLister restoreListerSynced cache.InformerSynced restoreClient arkv1client.RestoresGetter + logger *logrus.Logger } // NewGCController constructs a new gcController. @@ -62,9 +63,10 @@ func NewGCController( backupClient arkv1client.BackupsGetter, restoreInformer informers.RestoreInformer, restoreClient arkv1client.RestoresGetter, + logger *logrus.Logger, ) Interface { if syncPeriod < time.Minute { - glog.Infof("GC sync period %v is too short. Setting to 1 minute", syncPeriod) + logger.WithField("syncPeriod", syncPeriod).Info("Provided GC sync period is too short. Setting to 1 minute") syncPeriod = time.Minute } @@ -80,6 +82,7 @@ func NewGCController( restoreLister: restoreInformer.Lister(), restoreListerSynced: restoreInformer.Informer().HasSynced, restoreClient: restoreClient, + logger: logger, } } @@ -89,11 +92,11 @@ var _ Interface = &gcController{} // from object/block storage and the Ark API. It will return when it receives on the // ctx.Done() channel. func (c *gcController) Run(ctx context.Context, workers int) error { - glog.Info("Waiting for caches to sync") + c.logger.Info("Waiting for caches to sync") if !cache.WaitForCacheSync(ctx.Done(), c.backupListerSynced, c.restoreListerSynced) { return errors.New("timed out waiting for caches to sync") } - glog.Info("Caches are synced") + c.logger.Info("Caches are synced") wait.Until(c.run, c.syncPeriod, ctx.Done()) return nil @@ -107,11 +110,12 @@ func (c *gcController) run() { // deleteBackupFiles = true), volume snapshots, restore API objects, and the backup API object // itself. func (c *gcController) garbageCollectBackup(backup *api.Backup, deleteBackupFiles bool) { + logContext := c.logger.WithField("backup", kube.NamespaceAndName(backup)) + // if the backup includes snapshots but we don't currently have a PVProvider, we don't // want to orphan the snapshots so skip garbage-collection entirely. if c.snapshotService == nil && len(backup.Status.VolumeBackups) > 0 { - glog.Warningf("Cannot garbage-collect backup %s because backup includes snapshots and server is not configured with PersistentVolumeProvider", - kube.NamespaceAndName(backup)) + logContext.Warning("Cannot garbage-collect backup because backup includes snapshots and server is not configured with PersistentVolumeProvider") return } @@ -121,9 +125,9 @@ func (c *gcController) garbageCollectBackup(backup *api.Backup, deleteBackupFile deletionFailure := false for _, volumeBackup := range backup.Status.VolumeBackups { - glog.Infof("Removing snapshot %s associated with backup %s", volumeBackup.SnapshotID, kube.NamespaceAndName(backup)) + logContext.WithField("snapshotID", volumeBackup.SnapshotID).Info("Removing snapshot associated with backup") if err := c.snapshotService.DeleteSnapshot(volumeBackup.SnapshotID); err != nil { - glog.Errorf("error deleting snapshot %v: %v", volumeBackup.SnapshotID, err) + logContext.WithError(err).WithField("snapshotID", volumeBackup.SnapshotID).Error("Error deleting snapshot") deletionFailure = true } } @@ -131,35 +135,36 @@ func (c *gcController) garbageCollectBackup(backup *api.Backup, deleteBackupFile // If applicable, delete everything in the backup dir in object storage *before* deleting the API object // because otherwise the backup sync controller could re-sync the backup from object storage. if deleteBackupFiles { - glog.Infof("Removing backup %s from object storage", kube.NamespaceAndName(backup)) + logContext.Info("Removing backup from object storage") if err := c.backupService.DeleteBackupDir(c.bucket, backup.Name); err != nil { - glog.Errorf("error deleting backup %s: %v", kube.NamespaceAndName(backup), err) + logContext.WithError(err).Error("Error deleting backup") deletionFailure = true } } - glog.Infof("Getting restore API objects referencing backup %s", kube.NamespaceAndName(backup)) + logContext.Info("Getting restore API objects referencing backup") if restores, err := c.restoreLister.Restores(backup.Namespace).List(labels.Everything()); err != nil { - glog.Errorf("error getting restore API objects: %v", err) + logContext.WithError(errors.WithStack(err)).Error("Error getting Restore API objects") } else { for _, restore := range restores { if restore.Spec.BackupName == backup.Name { - glog.Infof("Removing restore API object %s of backup %s", kube.NamespaceAndName(restore), kube.NamespaceAndName(backup)) + logContext.WithField("restore", kube.NamespaceAndName(restore)).Info("Removing Restore API object referencing Backup") if err := c.restoreClient.Restores(restore.Namespace).Delete(restore.Name, &metav1.DeleteOptions{}); err != nil { - glog.Errorf("error deleting restore API object %s: %v", kube.NamespaceAndName(restore), err) + logContext.WithError(errors.WithStack(err)).WithField("restore", kube.NamespaceAndName(restore)). + Error("Error deleting Restore API object") } } } } if deletionFailure { - glog.Warningf("Backup %s will not be deleted due to errors deleting related object storage files(s) and/or volume snapshots", kube.NamespaceAndName(backup)) + logContext.Warning("Backup will not be deleted due to errors deleting related object storage files(s) and/or volume snapshots") return } - glog.Infof("Removing backup API object %s", kube.NamespaceAndName(backup)) + logContext.Info("Removing Backup API object") if err := c.backupClient.Backups(backup.Namespace).Delete(backup.Name, &metav1.DeleteOptions{}); err != nil { - glog.Errorf("error deleting backup API object %s: %v", kube.NamespaceAndName(backup), err) + logContext.WithError(errors.WithStack(err)).Error("Error deleting Backup API object") } } @@ -168,7 +173,7 @@ func (c *gcController) garbageCollectBackup(backup *api.Backup, deleteBackupFile func (c *gcController) garbageCollectBackups(backups []*api.Backup, expiration time.Time, deleteBackupFiles bool) { for _, backup := range backups { if backup.Status.Expiration.Time.After(expiration) { - glog.Infof("Backup %s has not expired yet, skipping", kube.NamespaceAndName(backup)) + c.logger.WithField("backup", kube.NamespaceAndName(backup)).Info("Backup has not expired yet, skipping") continue } @@ -180,13 +185,13 @@ func (c *gcController) garbageCollectBackups(backups []*api.Backup, expiration t // them for garbage-collection. func (c *gcController) processBackups() { now := c.clock.Now() - glog.Infof("garbage-collecting backups that have expired as of %v", now) + c.logger.WithField("now", now).Info("Garbage-collecting backups that have expired as of now") // GC backups in object storage. We do this in addition // to GC'ing API objects to prevent orphan backup files. backups, err := c.backupService.GetAllBackups(c.bucket) if err != nil { - glog.Errorf("error getting all backups from object storage: %v", err) + c.logger.WithError(err).Error("Error getting all backups from object storage") return } c.garbageCollectBackups(backups, now, true) @@ -194,7 +199,7 @@ func (c *gcController) processBackups() { // GC backups without files in object storage apiBackups, err := c.backupLister.List(labels.Everything()) if err != nil { - glog.Errorf("error getting all backup API objects: %v", err) + c.logger.WithError(errors.WithStack(err)).Error("Error getting all Backup API objects") return } c.garbageCollectBackups(apiBackups, now, false) diff --git a/pkg/controller/gc_controller_test.go b/pkg/controller/gc_controller_test.go index 382760664..b5109e09b 100644 --- a/pkg/controller/gc_controller_test.go +++ b/pkg/controller/gc_controller_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/clock" @@ -151,6 +152,7 @@ func TestGarbageCollect(t *testing.T) { sharedInformers = informers.NewSharedInformerFactory(client, 0) snapSvc cloudprovider.SnapshotService bucket = "bucket" + logger, _ = testlogger.NewNullLogger() ) if snapshotService != nil { @@ -166,6 +168,7 @@ func TestGarbageCollect(t *testing.T) { client.ArkV1(), sharedInformers.Ark().V1().Restores(), client.ArkV1(), + logger, ).(*gcController) controller.clock = fakeClock @@ -234,6 +237,7 @@ func TestGarbageCollectBackup(t *testing.T) { client = fake.NewSimpleClientset() sharedInformers = informers.NewSharedInformerFactory(client, 0) bucket = "bucket-1" + logger, _ = testlogger.NewNullLogger() controller = NewGCController( backupService, snapshotService, @@ -243,6 +247,7 @@ func TestGarbageCollectBackup(t *testing.T) { client.ArkV1(), sharedInformers.Ark().V1().Restores(), client.ArkV1(), + logger, ).(*gcController) ) @@ -316,6 +321,7 @@ func TestGarbageCollectPicksUpBackupUponExpiration(t *testing.T) { var ( client = fake.NewSimpleClientset() sharedInformers = informers.NewSharedInformerFactory(client, 0) + logger, _ = testlogger.NewNullLogger() ) controller := NewGCController( @@ -327,6 +333,7 @@ func TestGarbageCollectPicksUpBackupUponExpiration(t *testing.T) { client.ArkV1(), sharedInformers.Ark().V1().Restores(), client.ArkV1(), + logger, ).(*gcController) controller.clock = fakeClock diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index 3dbefba11..e912ee09e 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -18,7 +18,6 @@ package controller import ( "context" - "errors" "fmt" "io" "io/ioutil" @@ -26,7 +25,8 @@ import ( "sync" "time" - "github.com/golang/glog" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" @@ -41,22 +41,23 @@ import ( listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" "github.com/heptio/ark/pkg/restore" "github.com/heptio/ark/pkg/util/collections" + kubeutil "github.com/heptio/ark/pkg/util/kube" ) type restoreController struct { - restoreClient arkv1client.RestoresGetter - backupClient arkv1client.BackupsGetter - restorer restore.Restorer - backupService cloudprovider.BackupService - bucket string - pvProviderExists bool - + restoreClient arkv1client.RestoresGetter + backupClient arkv1client.BackupsGetter + restorer restore.Restorer + backupService cloudprovider.BackupService + bucket string + pvProviderExists bool backupLister listers.BackupLister backupListerSynced cache.InformerSynced restoreLister listers.RestoreLister restoreListerSynced cache.InformerSynced syncHandler func(restoreName string) error queue workqueue.RateLimitingInterface + logger *logrus.Logger } func NewRestoreController( @@ -68,6 +69,7 @@ func NewRestoreController( bucket string, backupInformer informers.BackupInformer, pvProviderExists bool, + logger *logrus.Logger, ) Interface { c := &restoreController{ restoreClient: restoreClient, @@ -81,6 +83,7 @@ func NewRestoreController( restoreLister: restoreInformer.Lister(), restoreListerSynced: restoreInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "restore"), + logger: logger, } c.syncHandler = c.processRestore @@ -94,13 +97,16 @@ func NewRestoreController( case "", api.RestorePhaseNew: // only process new restores default: - glog.V(4).Infof("Restore %s/%s has phase %s - skipping", restore.Namespace, restore.Name, restore.Status.Phase) + c.logger.WithFields(logrus.Fields{ + "restore": kubeutil.NamespaceAndName(restore), + "phase": restore.Status.Phase, + }).Debug("Restore is not new, skipping") return } key, err := cache.MetaNamespaceKeyFunc(restore) if err != nil { - glog.Errorf("error creating queue key for %#v: %v", restore, err) + c.logger.WithError(errors.WithStack(err)).WithField("restore", restore).Error("Error creating queue key, item not added to queue") return } c.queue.Add(key) @@ -118,7 +124,7 @@ func (controller *restoreController) Run(ctx context.Context, numWorkers int) er var wg sync.WaitGroup defer func() { - glog.Infof("Waiting for workers to finish their work") + controller.logger.Info("Waiting for workers to finish their work") controller.queue.ShutDown() @@ -127,17 +133,17 @@ func (controller *restoreController) Run(ctx context.Context, numWorkers int) er // we want to shut down the queue via defer and not at the end of the body. wg.Wait() - glog.Infof("All workers have finished") + controller.logger.Info("All workers have finished") }() - glog.Info("Starting RestoreController") - defer glog.Info("Shutting down RestoreController") + controller.logger.Info("Starting RestoreController") + defer controller.logger.Info("Shutting down RestoreController") - glog.Info("Waiting for caches to sync") + controller.logger.Info("Waiting for caches to sync") if !cache.WaitForCacheSync(ctx.Done(), controller.backupListerSynced, controller.restoreListerSynced) { return errors.New("timed out waiting for caches to sync") } - glog.Info("Caches are synced") + controller.logger.Info("Caches are synced") wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { @@ -176,7 +182,7 @@ func (controller *restoreController) processNextWorkItem() bool { return true } - glog.Errorf("syncHandler error: %v", err) + controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") // we had an error processing the item so add it back // into the queue for re-processing with rate-limiting controller.queue.AddRateLimited(key) @@ -185,18 +191,18 @@ func (controller *restoreController) processNextWorkItem() bool { } func (controller *restoreController) processRestore(key string) error { - glog.V(4).Infof("processRestore for key %q", key) + logContext := controller.logger.WithField("key", key) + + logContext.Debug("Running processRestore") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.V(4).Infof("error splitting key %q: %v", key, err) - return err + return errors.Wrap(err, "error splitting queue key") } - glog.V(4).Infof("Getting restore %s", key) + logContext.Debug("Getting Restore") restore, err := controller.restoreLister.Restores(ns).Get(name) if err != nil { - glog.V(4).Infof("error getting restore %s: %v", key, err) - return err + return errors.Wrap(err, "error getting Restore") } // TODO I think this is now unnecessary. We only initially place @@ -213,11 +219,10 @@ func (controller *restoreController) processRestore(key string) error { return nil } - glog.V(4).Infof("Cloning restore %s", key) + logContext.Debug("Cloning Restore") // don't modify items in the cache restore, err = cloneRestore(restore) if err != nil { - glog.V(4).Infof("error cloning restore %s: %v", key, err) return err } @@ -239,8 +244,7 @@ func (controller *restoreController) processRestore(key string) error { // update status updatedRestore, err := controller.restoreClient.Restores(ns).Update(restore) if err != nil { - glog.V(4).Infof("error updating status to %s: %v", restore.Status.Phase, err) - return err + return errors.Wrapf(err, "error updating Restore phase to %s", restore.Status.Phase) } restore = updatedRestore @@ -248,16 +252,16 @@ func (controller *restoreController) processRestore(key string) error { return nil } - glog.V(4).Infof("running restore for %s", key) + logContext.Debug("Running restore") // execution & upload of restore restore.Status.Warnings, restore.Status.Errors = controller.runRestore(restore, controller.bucket) - glog.V(4).Infof("restore %s completed", key) + logContext.Debug("restore completed") restore.Status.Phase = api.RestorePhaseCompleted - glog.V(4).Infof("updating restore %s final status", key) + logContext.Debug("Updating Restore final status") if _, err = controller.restoreClient.Restores(ns).Update(restore); err != nil { - glog.V(4).Infof("error updating restore %s final status: %v", key, err) + logContext.WithError(errors.WithStack(err)).Info("Error updating Restore final status") } return nil @@ -266,12 +270,12 @@ func (controller *restoreController) processRestore(key string) error { func cloneRestore(in interface{}) (*api.Restore, error) { clone, err := scheme.Scheme.DeepCopy(in) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error deep-copying Restore") } out, ok := clone.(*api.Restore) if !ok { - return nil, fmt.Errorf("unexpected type: %T", clone) + return nil, errors.Errorf("unexpected type: %T", clone) } return out, nil @@ -306,13 +310,14 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back } if !apierrors.IsNotFound(err) { - return nil, err + return nil, errors.WithStack(err) } - glog.V(4).Infof("Backup %q not found in backupLister, checking object storage directly.", name) + logContext := controller.logger.WithField("backupName", name) + + logContext.Debug("Backup not found in backupLister, checking object storage directly") backup, err = controller.backupService.GetBackup(bucket, name) if err != nil { - glog.V(4).Infof("Backup %q not found in object storage.", name) return nil, err } @@ -321,7 +326,7 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back created, createErr := controller.backupClient.Backups(api.DefaultNamespace).Create(backup) if createErr != nil { - glog.Errorf("Unable to create API object for backup %q: %v", name, createErr) + logContext.WithError(errors.WithStack(createErr)).Error("Unable to create API object for Backup") } else { backup = created } @@ -329,64 +334,66 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back return backup, nil } -func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (warnings, errors api.RestoreResult) { +func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (warnings, restoreErrors api.RestoreResult) { + logContext := controller.logger.WithField("restore", kubeutil.NamespaceAndName(restore)) + backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName) if err != nil { - glog.Errorf("error getting backup: %v", err) - errors.Ark = append(errors.Ark, err.Error()) + logContext.WithError(err).WithField("backup", restore.Spec.BackupName).Error("Error getting backup") + restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) return } - tmpFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket) + tmpFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger) if err != nil { - glog.Errorf("error downloading backup: %v", err) - errors.Ark = append(errors.Ark, err.Error()) + logContext.WithError(err).WithField("backup", restore.Spec.BackupName).Error("Error downloading backup") + restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) return } logFile, err := ioutil.TempFile("", "") if err != nil { - glog.Errorf("error creating log temp file: %v", err) - errors.Ark = append(errors.Ark, err.Error()) + logContext.WithError(errors.WithStack(err)).Error("Error creating log temp file") + restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) return } defer func() { if err := tmpFile.Close(); err != nil { - glog.Errorf("error closing %q: %v", tmpFile.Name(), err) + logContext.WithError(errors.WithStack(err)).WithField("file", tmpFile.Name()).Error("Error closing file") } if err := os.Remove(tmpFile.Name()); err != nil { - glog.Errorf("error removing %q: %v", tmpFile.Name(), err) + logContext.WithError(errors.WithStack(err)).WithField("file", tmpFile.Name()).Error("Error removing file") } if err := logFile.Close(); err != nil { - glog.Errorf("error closing %q: %v", logFile.Name(), err) + logContext.WithError(errors.WithStack(err)).WithField("file", logFile.Name()).Error("Error closing file") } if err := os.Remove(logFile.Name()); err != nil { - glog.Errorf("error removing %q: %v", logFile.Name(), err) + logContext.WithError(errors.WithStack(err)).WithField("file", logFile.Name()).Error("Error removing file") } }() - warnings, errors = controller.restorer.Restore(restore, backup, tmpFile, logFile) + warnings, restoreErrors = controller.restorer.Restore(restore, backup, tmpFile, logFile) // Try to upload the log file. This is best-effort. If we fail, we'll add to the ark errors. // Reset the offset to 0 for reading if _, err = logFile.Seek(0, 0); err != nil { - errors.Ark = append(errors.Ark, fmt.Sprintf("error resetting log file offset to 0: %v", err)) + restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error resetting log file offset to 0: %v", err)) return } if err := controller.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil { - errors.Ark = append(errors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err)) + restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err)) } return } -func downloadToTempFile(backupName string, backupService cloudprovider.BackupService, bucket string) (*os.File, error) { +func downloadToTempFile(backupName string, backupService cloudprovider.BackupService, bucket string, logger *logrus.Logger) (*os.File, error) { readCloser, err := backupService.DownloadBackup(bucket, backupName) if err != nil { return nil, err @@ -395,18 +402,23 @@ func downloadToTempFile(backupName string, backupService cloudprovider.BackupSer file, err := ioutil.TempFile("", backupName) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error creating Backup temp file") } n, err := io.Copy(file, readCloser) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error copying Backup to temp file") } - glog.V(4).Infof("copied %d bytes", n) + + logContext := logger.WithField("backup", backupName) + + logContext.WithFields(logrus.Fields{ + "fileName": file.Name(), + "bytes": n, + }).Debug("Copied Backup to file") if _, err := file.Seek(0, 0); err != nil { - glog.V(4).Infof("error seeking: %v", err) - return nil, err + return nil, errors.Wrap(err, "error resetting Backup file offset") } return file, nil diff --git a/pkg/controller/restore_controller_test.go b/pkg/controller/restore_controller_test.go index 98c144719..96dddef88 100644 --- a/pkg/controller/restore_controller_test.go +++ b/pkg/controller/restore_controller_test.go @@ -23,6 +23,7 @@ import ( "io/ioutil" "testing" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -74,6 +75,7 @@ func TestFetchBackup(t *testing.T) { restorer = &fakeRestorer{} sharedInformers = informers.NewSharedInformerFactory(client, 0) backupSvc = &BackupService{} + logger, _ = testlogger.NewNullLogger() ) c := NewRestoreController( @@ -85,6 +87,7 @@ func TestFetchBackup(t *testing.T) { "bucket", sharedInformers.Ark().V1().Backups(), false, + logger, ).(*restoreController) for _, itm := range test.informerBackups { @@ -260,14 +263,15 @@ func TestProcessRestore(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset() + var ( + client = fake.NewSimpleClientset() + restorer = &fakeRestorer{} + sharedInformers = informers.NewSharedInformerFactory(client, 0) + backupSvc = &BackupService{} + logger, _ = testlogger.NewNullLogger() + ) - restorer := &fakeRestorer{} defer restorer.AssertExpectations(t) - - sharedInformers := informers.NewSharedInformerFactory(client, 0) - - backupSvc := &BackupService{} defer backupSvc.AssertExpectations(t) c := NewRestoreController( @@ -279,6 +283,7 @@ func TestProcessRestore(t *testing.T) { "bucket", sharedInformers.Ark().V1().Backups(), test.allowRestoreSnapshots, + logger, ).(*restoreController) if test.restore != nil { diff --git a/pkg/controller/schedule_controller.go b/pkg/controller/schedule_controller.go index 7f16dea64..5d44d0644 100644 --- a/pkg/controller/schedule_controller.go +++ b/pkg/controller/schedule_controller.go @@ -18,13 +18,13 @@ package controller import ( "context" - "errors" "fmt" "sync" "time" - "github.com/golang/glog" + "github.com/pkg/errors" "github.com/robfig/cron" + "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,6 +39,7 @@ import ( arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + kubeutil "github.com/heptio/ark/pkg/util/kube" ) type scheduleController struct { @@ -50,6 +51,7 @@ type scheduleController struct { queue workqueue.RateLimitingInterface syncPeriod time.Duration clock clock.Clock + logger *logrus.Logger } func NewScheduleController( @@ -57,9 +59,10 @@ func NewScheduleController( backupsClient arkv1client.BackupsGetter, schedulesInformer informers.ScheduleInformer, syncPeriod time.Duration, + logger *logrus.Logger, ) *scheduleController { if syncPeriod < time.Minute { - glog.Infof("Schedule sync period %v is too short. Setting to 1 minute", syncPeriod) + logger.WithField("syncPeriod", syncPeriod).Info("Provided schedule sync period is too short. Setting to 1 minute") syncPeriod = time.Minute } @@ -71,6 +74,7 @@ func NewScheduleController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "schedule"), syncPeriod: syncPeriod, clock: clock.RealClock{}, + logger: logger, } c.syncHandler = c.processSchedule @@ -84,13 +88,16 @@ func NewScheduleController( case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled: // add to work queue default: - glog.V(4).Infof("Schedule %s/%s has phase %s - skipping", schedule.Namespace, schedule.Name, schedule.Status.Phase) + c.logger.WithFields(logrus.Fields{ + "schedule": kubeutil.NamespaceAndName(schedule), + "phase": schedule.Status.Phase, + }).Debug("Schedule is not new, skipping") return } key, err := cache.MetaNamespaceKeyFunc(schedule) if err != nil { - glog.Errorf("error creating queue key for %#v: %v", schedule, err) + c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue") return } c.queue.Add(key) @@ -108,7 +115,7 @@ func (controller *scheduleController) Run(ctx context.Context, numWorkers int) e var wg sync.WaitGroup defer func() { - glog.Infof("Waiting for workers to finish their work") + controller.logger.Info("Waiting for workers to finish their work") controller.queue.ShutDown() @@ -117,17 +124,17 @@ func (controller *scheduleController) Run(ctx context.Context, numWorkers int) e // we want to shut down the queue via defer and not at the end of the body. wg.Wait() - glog.Infof("All workers have finished") + controller.logger.Info("All workers have finished") }() - glog.Info("Starting ScheduleController") - defer glog.Info("Shutting down ScheduleController") + controller.logger.Info("Starting ScheduleController") + defer controller.logger.Info("Shutting down ScheduleController") - glog.Info("Waiting for caches to sync") + controller.logger.Info("Waiting for caches to sync") if !cache.WaitForCacheSync(ctx.Done(), controller.schedulesListerSynced) { return errors.New("timed out waiting for caches to sync") } - glog.Info("Caches are synced") + controller.logger.Info("Caches are synced") wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { @@ -146,7 +153,7 @@ func (controller *scheduleController) Run(ctx context.Context, numWorkers int) e func (controller *scheduleController) enqueueAllEnabledSchedules() { schedules, err := controller.schedulesLister.Schedules(api.DefaultNamespace).List(labels.NewSelector()) if err != nil { - glog.Errorf("error listing schedules: %v", err) + controller.logger.WithError(errors.WithStack(err)).Error("Error listing Schedules") return } @@ -157,7 +164,7 @@ func (controller *scheduleController) enqueueAllEnabledSchedules() { key, err := cache.MetaNamespaceKeyFunc(schedule) if err != nil { - glog.Errorf("error creating queue key for %#v: %v", schedule, err) + controller.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue") continue } controller.queue.Add(key) @@ -188,7 +195,7 @@ func (controller *scheduleController) processNextWorkItem() bool { return true } - glog.Errorf("syncHandler error: %v", err) + controller.logger.WithError(err).WithField("key", key).Error("Error in syncHandler, re-adding item to queue") // we had an error processing the item so add it back // into the queue for re-processing with rate-limiting controller.queue.AddRateLimited(key) @@ -197,23 +204,23 @@ func (controller *scheduleController) processNextWorkItem() bool { } func (controller *scheduleController) processSchedule(key string) error { - glog.V(4).Infof("processSchedule for key %q", key) + logContext := controller.logger.WithField("key", key) + + logContext.Debug("Running processSchedule") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.V(4).Infof("error splitting key %q: %v", key, err) - return err + return errors.Wrap(err, "error splitting queue key") } - glog.V(4).Infof("Getting schedule %s", key) + logContext.Debug("Getting Schedule") schedule, err := controller.schedulesLister.Schedules(ns).Get(name) if err != nil { // schedule no longer exists if apierrors.IsNotFound(err) { - glog.V(4).Infof("schedule %s not found: %v", key, err) + logContext.WithError(err).Debug("Schedule not found") return nil } - glog.V(4).Infof("error getting schedule %s: %v", key, err) - return err + return errors.Wrap(err, "error getting Schedule") } switch schedule.Status.Phase { @@ -223,11 +230,10 @@ func (controller *scheduleController) processSchedule(key string) error { return nil } - glog.V(4).Infof("Cloning schedule %s", key) + logContext.Debug("Cloning schedule") // don't modify items in the cache schedule, err = cloneSchedule(schedule) if err != nil { - glog.V(4).Infof("error cloning schedule %s: %v", key, err) return err } @@ -235,7 +241,7 @@ func (controller *scheduleController) processSchedule(key string) error { // so re-validate currentPhase := schedule.Status.Phase - cronSchedule, errs := parseCronSchedule(schedule) + cronSchedule, errs := parseCronSchedule(schedule, controller.logger) if len(errs) > 0 { schedule.Status.Phase = api.SchedulePhaseFailedValidation schedule.Status.ValidationErrors = errs @@ -247,8 +253,7 @@ func (controller *scheduleController) processSchedule(key string) error { if currentPhase != schedule.Status.Phase { updatedSchedule, err := controller.schedulesClient.Schedules(ns).Update(schedule) if err != nil { - glog.V(4).Infof("error updating status to %s: %v", schedule.Status.Phase, err) - return err + return errors.Wrapf(err, "error updating Schedule phase to %s", schedule.Status.Phase) } schedule = updatedSchedule } @@ -259,7 +264,6 @@ func (controller *scheduleController) processSchedule(key string) error { // check for the schedule being due to run, and submit a Backup if so if err := controller.submitBackupIfDue(schedule, cronSchedule); err != nil { - glog.V(4).Infof("error processing Schedule %v/%v: err=%v", schedule.Namespace, schedule.Name, err) return err } @@ -269,18 +273,18 @@ func (controller *scheduleController) processSchedule(key string) error { func cloneSchedule(in interface{}) (*api.Schedule, error) { clone, err := scheme.Scheme.DeepCopy(in) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error deep-copying Schedule") } out, ok := clone.(*api.Schedule) if !ok { - return nil, fmt.Errorf("unexpected type: %T", clone) + return nil, errors.Errorf("unexpected type: %T", clone) } return out, nil } -func parseCronSchedule(itm *api.Schedule) (cron.Schedule, []string) { +func parseCronSchedule(itm *api.Schedule, logger *logrus.Logger) (cron.Schedule, []string) { var validationErrors []string var schedule cron.Schedule @@ -290,18 +294,23 @@ func parseCronSchedule(itm *api.Schedule) (cron.Schedule, []string) { return nil, validationErrors } + logContext := logger.WithField("schedule", kubeutil.NamespaceAndName(itm)) + // adding a recover() around cron.Parse because it panics on empty string and is possible // that it panics under other scenarios as well. func() { defer func() { if r := recover(); r != nil { - glog.V(4).Infof("panic parsing schedule %v/%v, cron schedule=%v: %v", itm.Namespace, itm.Name, itm.Spec.Schedule, r) + logContext.WithFields(logrus.Fields{ + "schedule": itm.Spec.Schedule, + "recover": r, + }).Debug("Panic parsing schedule") validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", r)) } }() if res, err := cron.ParseStandard(itm.Spec.Schedule); err != nil { - glog.V(4).Infof("error parsing schedule %v/%v, cron schedule=%v: %v", itm.Namespace, itm.Name, itm.Spec.Schedule, err) + logContext.WithError(errors.WithStack(err)).WithField("schedule", itm.Spec.Schedule).Debug("Error parsing schedule") validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", err)) } else { schedule = res @@ -316,12 +325,14 @@ func parseCronSchedule(itm *api.Schedule) (cron.Schedule, []string) { } func (controller *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error { - now := controller.clock.Now() - - isDue, nextRunTime := getNextRunTime(item, cronSchedule, now) + var ( + now = controller.clock.Now() + isDue, nextRunTime = getNextRunTime(item, cronSchedule, now) + logContext = controller.logger.WithField("schedule", kubeutil.NamespaceAndName(item)) + ) if !isDue { - glog.Infof("Next run time for %v/%v is %v, skipping...", item.Namespace, item.Name, nextRunTime) + logContext.WithField("nextRunTime", nextRunTime).Info("Schedule is not due, skipping") return nil } @@ -331,25 +342,21 @@ func (controller *scheduleController) submitBackupIfDue(item *api.Schedule, cron // It might also make sense in the future to explicitly check for currently-running // backups so that we don't overlap runs (for disk snapshots in particular, this can // lead to performance issues). - - glog.Infof("Next run time for %v/%v is %v, submitting Backup...", item.Namespace, item.Name, nextRunTime) + logContext.WithField("nextRunTime", nextRunTime).Info("Schedule is due, submitting Backup") backup := getBackup(item, now) if _, err := controller.backupsClient.Backups(backup.Namespace).Create(backup); err != nil { - glog.V(4).Infof("error creating Backup: %v", err) - return err + return errors.Wrap(err, "error creating Backup") } schedule, err := cloneSchedule(item) if err != nil { - glog.V(4).Infof("error cloning Schedule %v/%v: %v", item.Namespace, item.Name, err) return err } schedule.Status.LastBackup = metav1.NewTime(now) if _, err := controller.schedulesClient.Schedules(schedule.Namespace).Update(schedule); err != nil { - glog.V(4).Infof("error updating LastBackup for Schedule %v/%v: %v", schedule.Namespace, schedule.Name, err) - return err + return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", schedule.Status.LastBackup) } return nil diff --git a/pkg/controller/schedule_controller_test.go b/pkg/controller/schedule_controller_test.go index 240e4152d..b63af3422 100644 --- a/pkg/controller/schedule_controller_test.go +++ b/pkg/controller/schedule_controller_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/robfig/cron" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -123,6 +124,7 @@ func TestProcessSchedule(t *testing.T) { var ( client = fake.NewSimpleClientset() sharedInformers = informers.NewSharedInformerFactory(client, 0) + logger, _ = testlogger.NewNullLogger() ) c := NewScheduleController( @@ -130,6 +132,7 @@ func TestProcessSchedule(t *testing.T) { client.ArkV1(), sharedInformers.Ark().V1().Schedules(), time.Duration(0), + logger, ) var ( @@ -291,7 +294,9 @@ func TestParseCronSchedule(t *testing.T) { }, } - c, errs := parseCronSchedule(s) + logger, _ := testlogger.NewNullLogger() + + c, errs := parseCronSchedule(s, logger) require.Empty(t, errs) // make sure we're not due and next backup is tomorrow at 9am diff --git a/pkg/discovery/helper.go b/pkg/discovery/helper.go index 8b7877a42..496f9e56f 100644 --- a/pkg/discovery/helper.go +++ b/pkg/discovery/helper.go @@ -21,6 +21,8 @@ import ( "sync" kcmdutil "github.com/heptio/ark/third_party/kubernetes/pkg/kubectl/cmd/util" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +54,7 @@ type Helper interface { type helper struct { discoveryClient discovery.DiscoveryInterface + logger *logrus.Logger // lock guards mapper and resources lock sync.RWMutex @@ -61,7 +64,7 @@ type helper struct { var _ Helper = &helper{} -func NewHelper(discoveryClient discovery.DiscoveryInterface) (Helper, error) { +func NewHelper(discoveryClient discovery.DiscoveryInterface, logger *logrus.Logger) (Helper, error) { h := &helper{ discoveryClient: discoveryClient, } @@ -77,18 +80,18 @@ func (h *helper) Refresh() error { groupResources, err := discovery.GetAPIGroupResources(h.discoveryClient) if err != nil { - return err + return errors.WithStack(err) } mapper := discovery.NewRESTMapper(groupResources, dynamic.VersionInterfaces) - shortcutExpander, err := kcmdutil.NewShortcutExpander(mapper, h.discoveryClient) + shortcutExpander, err := kcmdutil.NewShortcutExpander(mapper, h.discoveryClient, h.logger) if err != nil { - return err + return errors.WithStack(err) } h.mapper = shortcutExpander preferredResources, err := h.discoveryClient.ServerPreferredResources() if err != nil { - return err + return errors.WithStack(err) } h.resources = discovery.FilteredBy( diff --git a/pkg/restore/resource_waiter.go b/pkg/restore/resource_waiter.go index 896b06017..cac81a957 100644 --- a/pkg/restore/resource_waiter.go +++ b/pkg/restore/resource_waiter.go @@ -17,10 +17,10 @@ limitations under the License. package restore import ( - "errors" - "fmt" "time" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -69,7 +69,7 @@ func (rw *resourceWaiter) Wait() error { case event := <-rw.watchChan: obj, ok := event.Object.(*unstructured.Unstructured) if !ok { - return fmt.Errorf("Unexpected type %T", event.Object) + return errors.Errorf("Unexpected type %T", event.Object) } if event.Type == watch.Added || event.Type == watch.Modified { diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 6c8805851..8886aab59 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -28,6 +28,8 @@ import ( "sort" "time" + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -37,7 +39,6 @@ import ( corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/pkg/api/v1" - "github.com/golang/glog" api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" "github.com/heptio/ark/pkg/cloudprovider" @@ -69,12 +70,13 @@ type kubernetesRestorer struct { namespaceClient corev1.NamespaceInterface resourcePriorities []string fileSystem FileSystem + logger *logrus.Logger } // prioritizeResources takes a list of pre-prioritized resources and a full list of resources to restore, // and returns an ordered list of GroupResource-resolved resources in the order that they should be // restored. -func prioritizeResources(helper discovery.Helper, priorities []string, includedResources *collections.IncludesExcludes) ([]schema.GroupResource, error) { +func prioritizeResources(helper discovery.Helper, priorities []string, includedResources *collections.IncludesExcludes, logger *logrus.Logger) ([]schema.GroupResource, error) { var ret []schema.GroupResource // set keeps track of resolved GroupResource names @@ -88,7 +90,7 @@ func prioritizeResources(helper discovery.Helper, priorities []string, includedR } if !includedResources.ShouldInclude(gr.String()) { - glog.Infof("Not including resource %v", gr) + logger.WithField("groupResource", gr).Info("Not including resource") continue } @@ -109,7 +111,7 @@ func prioritizeResources(helper discovery.Helper, priorities []string, includedR gr := groupVersion.WithResource(resource.Name).GroupResource() if !includedResources.ShouldInclude(gr.String()) { - glog.Infof("Not including resource %v", gr) + logger.WithField("groupResource", gr).Info("Not including resource") continue } @@ -139,6 +141,7 @@ func NewKubernetesRestorer( resourcePriorities []string, backupClient arkv1client.BackupsGetter, namespaceClient corev1.NamespaceInterface, + logger *logrus.Logger, ) (Restorer, error) { r := make(map[schema.GroupResource]restorers.ResourceRestorer) for gr, restorer := range customRestorers { @@ -158,6 +161,7 @@ func NewKubernetesRestorer( namespaceClient: namespaceClient, resourcePriorities: resourcePriorities, fileSystem: &osFileSystem{}, + logger: logger, }, nil } @@ -183,17 +187,18 @@ func (kr *kubernetesRestorer) Restore(restore *api.Restore, backup *api.Backup, resourceIncludesExcludes := collections.GenerateIncludesExcludes( restore.Spec.IncludedResources, restore.Spec.ExcludedResources, - func(item string) (string, error) { + func(item string) string { gr, err := kr.discoveryHelper.ResolveGroupResource(item) if err != nil { - return "", err + kr.logger.WithError(err).WithField("resource", item).Error("Unable to resolve resource") + return "" } - return gr.String(), nil + return gr.String() }, ) - prioritizedResources, err := prioritizeResources(kr.discoveryHelper, kr.resourcePriorities, resourceIncludesExcludes) + prioritizedResources, err := prioritizeResources(kr.discoveryHelper, kr.resourcePriorities, resourceIncludesExcludes, kr.logger) if err != nil { return api.RestoreResult{}, api.RestoreResult{Ark: []string{err.Error()}} } diff --git a/pkg/restore/restore_test.go b/pkg/restore/restore_test.go index bf23ea72a..09eaab7d0 100644 --- a/pkg/restore/restore_test.go +++ b/pkg/restore/restore_test.go @@ -23,6 +23,7 @@ import ( "os" "testing" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -80,6 +81,8 @@ func TestPrioritizeResources(t *testing.T) { }, } + logger, _ := test.NewNullLogger() + for _, test := range tests { t.Run(test.name, func(t *testing.T) { helper := &FakeDiscoveryHelper{RESTMapper: &FakeMapper{AutoReturnResource: true}} @@ -94,7 +97,7 @@ func TestPrioritizeResources(t *testing.T) { includesExcludes := collections.NewIncludesExcludes().Includes(test.includes...).Excludes(test.excludes...) - result, err := prioritizeResources(helper, test.priorities, includesExcludes) + result, err := prioritizeResources(helper, test.priorities, includesExcludes, logger) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/restore/restorers/job_restorer.go b/pkg/restore/restorers/job_restorer.go index 90f8e5960..1d213d0c2 100644 --- a/pkg/restore/restorers/job_restorer.go +++ b/pkg/restore/restorers/job_restorer.go @@ -17,7 +17,7 @@ limitations under the License. package restorers import ( - "github.com/golang/glog" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" @@ -25,12 +25,16 @@ import ( "github.com/heptio/ark/pkg/util/collections" ) -type jobRestorer struct{} +type jobRestorer struct { + logger *logrus.Logger +} var _ ResourceRestorer = &jobRestorer{} -func NewJobRestorer() ResourceRestorer { - return &jobRestorer{} +func NewJobRestorer(logger *logrus.Logger) ResourceRestorer { + return &jobRestorer{ + logger: logger, + } } func (r *jobRestorer) Handles(obj runtime.Unstructured, restore *api.Restore) bool { @@ -38,25 +42,25 @@ func (r *jobRestorer) Handles(obj runtime.Unstructured, restore *api.Restore) bo } func (r *jobRestorer) Prepare(obj runtime.Unstructured, restore *api.Restore, backup *api.Backup) (runtime.Unstructured, error, error) { - glog.V(4).Infof("resetting metadata and status") + r.logger.Debug("resetting metadata and status") _, err := resetMetadataAndStatus(obj, true) if err != nil { return nil, nil, err } - glog.V(4).Infof("getting spec.selector.matchLabels") - matchLabels, err := collections.GetMap(obj.UnstructuredContent(), "spec.selector.matchLabels") - if err != nil { - glog.V(4).Infof("unable to get spec.selector.matchLabels: %v", err) - } else { - delete(matchLabels, "controller-uid") + fieldDeletions := map[string]string{ + "spec.selector.matchLabels": "controller-uid", + "spec.template.metadata.labels": "controller-uid", } - templateLabels, err := collections.GetMap(obj.UnstructuredContent(), "spec.template.metadata.labels") - if err != nil { - glog.V(4).Infof("unable to get spec.template.metadata.labels: %v", err) - } else { - delete(templateLabels, "controller-uid") + for k, v := range fieldDeletions { + r.logger.Debugf("Getting %s", k) + labels, err := collections.GetMap(obj.UnstructuredContent(), k) + if err != nil { + r.logger.WithError(err).Debugf("Unable to get %s", k) + } else { + delete(labels, v) + } } return obj, nil, nil diff --git a/pkg/restore/restorers/job_restorer_test.go b/pkg/restore/restorers/job_restorer_test.go index 63094418c..5cbe4e2d7 100644 --- a/pkg/restore/restorers/job_restorer_test.go +++ b/pkg/restore/restorers/job_restorer_test.go @@ -19,6 +19,7 @@ package restorers import ( "testing" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime" @@ -126,7 +127,10 @@ func TestJobRestorerPrepare(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - restorer := NewJobRestorer() + var ( + logger, _ = testlogger.NewNullLogger() + restorer = NewJobRestorer(logger) + ) res, _, err := restorer.Prepare(test.obj, nil, nil) diff --git a/pkg/restore/restorers/pod_restorer.go b/pkg/restore/restorers/pod_restorer.go index 072c1ce15..638632be8 100644 --- a/pkg/restore/restorers/pod_restorer.go +++ b/pkg/restore/restorers/pod_restorer.go @@ -19,7 +19,7 @@ package restorers import ( "regexp" - "github.com/golang/glog" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" @@ -27,12 +27,16 @@ import ( "github.com/heptio/ark/pkg/util/collections" ) -type podRestorer struct{} +type podRestorer struct { + logger *logrus.Logger +} var _ ResourceRestorer = &podRestorer{} -func NewPodRestorer() ResourceRestorer { - return &podRestorer{} +func NewPodRestorer(logger *logrus.Logger) ResourceRestorer { + return &podRestorer{ + logger: logger, + } } func (nsr *podRestorer) Handles(obj runtime.Unstructured, restore *api.Restore) bool { @@ -43,37 +47,36 @@ var ( defaultTokenRegex = regexp.MustCompile("default-token-.*") ) -func (nsr *podRestorer) Prepare(obj runtime.Unstructured, restore *api.Restore, backup *api.Backup) (runtime.Unstructured, error, error) { - glog.V(4).Infof("resetting metadata and status") +func (r *podRestorer) Prepare(obj runtime.Unstructured, restore *api.Restore, backup *api.Backup) (runtime.Unstructured, error, error) { + r.logger.Debug("resetting metadata and status") _, err := resetMetadataAndStatus(obj, true) if err != nil { return nil, nil, err } - glog.V(4).Infof("getting spec") + r.logger.Debug("getting spec") spec, err := collections.GetMap(obj.UnstructuredContent(), "spec") if err != nil { return nil, nil, err } - glog.V(4).Infof("deleting spec.NodeName") + r.logger.Debug("deleting spec.NodeName") delete(spec, "nodeName") newVolumes := make([]interface{}, 0) - glog.V(4).Infof("iterating over volumes") + r.logger.Debug("iterating over volumes") err = collections.ForEach(spec, "volumes", func(volume map[string]interface{}) error { name, err := collections.GetString(volume, "name") if err != nil { return err } - glog.V(4).Infof("checking volume with name %q", name) - + r.logger.WithField("volumeName", name).Debug("Checking volume") if !defaultTokenRegex.MatchString(name) { - glog.V(4).Infof("preserving volume") + r.logger.WithField("volumeName", name).Debug("Preserving volume") newVolumes = append(newVolumes, volume) } else { - glog.V(4).Infof("excluding volume") + r.logger.WithField("volumeName", name).Debug("Excluding volume") } return nil @@ -82,10 +85,10 @@ func (nsr *podRestorer) Prepare(obj runtime.Unstructured, restore *api.Restore, return nil, nil, err } - glog.V(4).Infof("setting spec.volumes") + r.logger.Debug("Setting spec.volumes") spec["volumes"] = newVolumes - glog.V(4).Infof("iterating over containers") + r.logger.Debug("iterating over containers") err = collections.ForEach(spec, "containers", func(container map[string]interface{}) error { var newVolumeMounts []interface{} err := collections.ForEach(container, "volumeMounts", func(volumeMount map[string]interface{}) error { @@ -94,13 +97,12 @@ func (nsr *podRestorer) Prepare(obj runtime.Unstructured, restore *api.Restore, return err } - glog.V(4).Infof("checking volumeMount with name %q", name) - + r.logger.WithField("volumeMount", name).Debug("Checking volumeMount") if !defaultTokenRegex.MatchString(name) { - glog.V(4).Infof("preserving volumeMount") + r.logger.WithField("volumeMount", name).Debug("Preserving volumeMount") newVolumeMounts = append(newVolumeMounts, volumeMount) } else { - glog.V(4).Infof("excluding volumeMount") + r.logger.WithField("volumeMount", name).Debug("Excluding volumeMount") } return nil diff --git a/pkg/restore/restorers/pod_restorer_test.go b/pkg/restore/restorers/pod_restorer_test.go index ac9027c47..71e1e1547 100644 --- a/pkg/restore/restorers/pod_restorer_test.go +++ b/pkg/restore/restorers/pod_restorer_test.go @@ -19,6 +19,7 @@ package restorers import ( "testing" + testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime" @@ -96,7 +97,10 @@ func TestPodRestorerPrepare(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - restorer := NewPodRestorer() + var ( + logger, _ = testlogger.NewNullLogger() + restorer = NewPodRestorer(logger) + ) res, _, err := restorer.Prepare(test.obj, nil, nil) diff --git a/pkg/restore/restorers/pv_restorer.go b/pkg/restore/restorers/pv_restorer.go index d255738e3..af8e57ff6 100644 --- a/pkg/restore/restorers/pv_restorer.go +++ b/pkg/restore/restorers/pv_restorer.go @@ -17,8 +17,7 @@ limitations under the License. package restorers import ( - "errors" - "fmt" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" @@ -83,7 +82,7 @@ func (sr *persistentVolumeRestorer) Prepare(obj runtime.Unstructured, restore *a // if there are snapshots, and this is a supported PV type, but there's no // snapshot for this PV, it's an error if backup.Status.VolumeBackups[pvName] == nil { - return nil, nil, fmt.Errorf("no snapshot found to restore volume %s from", pvName) + return nil, nil, errors.Errorf("no snapshot found to restore volume %s from", pvName) } restoreFromSnapshot = true diff --git a/pkg/util/collections/includes_excludes.go b/pkg/util/collections/includes_excludes.go index 71d00a2bc..62d165968 100644 --- a/pkg/util/collections/includes_excludes.go +++ b/pkg/util/collections/includes_excludes.go @@ -17,10 +17,7 @@ limitations under the License. package collections import ( - "errors" - "fmt" - - "github.com/golang/glog" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" ) @@ -102,7 +99,7 @@ func ValidateIncludesExcludes(includesList, excludesList []string) []error { for _, itm := range excludes.List() { if includes.Has(itm) { - errs = append(errs, fmt.Errorf("excludes list cannot contain an item in the includes list: %v", itm)) + errs = append(errs, errors.Errorf("excludes list cannot contain an item in the includes list: %v", itm)) } } @@ -113,7 +110,7 @@ func ValidateIncludesExcludes(includesList, excludesList []string) []error { // include/exclude slices, applying the specified mapping function to each item in them, // and adding the output of the function to the new struct. If the mapping function returns // an error for an item, it is omitted from the result. -func GenerateIncludesExcludes(includes, excludes []string, mapFunc func(string) (string, error)) *IncludesExcludes { +func GenerateIncludesExcludes(includes, excludes []string, mapFunc func(string) string) *IncludesExcludes { res := NewIncludesExcludes() for _, item := range includes { @@ -122,22 +119,18 @@ func GenerateIncludesExcludes(includes, excludes []string, mapFunc func(string) continue } - key, err := mapFunc(item) - if err != nil { - glog.Errorf("unable to include item %q: %v", item, err) + key := mapFunc(item) + if key == "" { continue } - res.Includes(key) } for _, item := range excludes { - key, err := mapFunc(item) - if err != nil { - glog.Errorf("unable to exclude item %q: %v", item, err) + key := mapFunc(item) + if key == "" { continue } - res.Excludes(key) } diff --git a/pkg/util/collections/includes_excludes_test.go b/pkg/util/collections/includes_excludes_test.go index cf6206b81..bc87746b5 100644 --- a/pkg/util/collections/includes_excludes_test.go +++ b/pkg/util/collections/includes_excludes_test.go @@ -17,10 +17,12 @@ limitations under the License. package collections import ( - "errors" "testing" "github.com/stretchr/testify/assert" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) func TestShouldInclude(t *testing.T) { @@ -126,7 +128,11 @@ func TestValidateIncludesExcludes(t *testing.T) { t.Run(test.name, func(t *testing.T) { res := ValidateIncludesExcludes(test.includes, test.excludes) - assert.Equal(t, test.expected, res) + require.Equal(t, len(test.expected), len(res)) + + for i := 0; i < len(test.expected); i++ { + assert.Equal(t, test.expected[i].Error(), res[i].Error()) + } }) } } diff --git a/pkg/util/collections/map_utils.go b/pkg/util/collections/map_utils.go index 851d4ca0d..0c95e8ff1 100644 --- a/pkg/util/collections/map_utils.go +++ b/pkg/util/collections/map_utils.go @@ -17,9 +17,9 @@ limitations under the License. package collections import ( - "errors" - "fmt" "strings" + + "github.com/pkg/errors" ) // GetValue returns the object at root[path], where path is a dot separated string. @@ -33,7 +33,7 @@ func GetValue(root map[string]interface{}, path string) (interface{}, error) { obj, found := root[pathParts[0]] if !found { - return "", fmt.Errorf("key %v not found", pathParts[0]) + return "", errors.Errorf("key %v not found", pathParts[0]) } if len(pathParts) == 1 { @@ -42,7 +42,7 @@ func GetValue(root map[string]interface{}, path string) (interface{}, error) { subMap, ok := obj.(map[string]interface{}) if !ok { - return "", fmt.Errorf("value at key %v is not a map[string]interface{}", key) + return "", errors.Errorf("value at key %v is not a map[string]interface{}", key) } return GetValue(subMap, strings.Join(pathParts[1:], ".")) @@ -57,7 +57,7 @@ func GetString(root map[string]interface{}, path string) (string, error) { str, ok := obj.(string) if !ok { - return "", fmt.Errorf("value at path %v is not a string", path) + return "", errors.Errorf("value at path %v is not a string", path) } return str, nil @@ -72,7 +72,7 @@ func GetMap(root map[string]interface{}, path string) (map[string]interface{}, e ret, ok := obj.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("value at path %v is not a map[string]interface{}", path) + return nil, errors.Errorf("value at path %v is not a map[string]interface{}", path) } return ret, nil @@ -87,7 +87,7 @@ func GetSlice(root map[string]interface{}, path string) ([]interface{}, error) { ret, ok := obj.([]interface{}) if !ok { - return nil, fmt.Errorf("value at path %v is not a []interface{}", path) + return nil, errors.Errorf("value at path %v is not a []interface{}", path) } return ret, nil @@ -103,7 +103,7 @@ func ForEach(root map[string]interface{}, path string, fn func(obj map[string]in for i := range s { obj, ok := s[i].(map[string]interface{}) if !ok { - return fmt.Errorf("unable to convert %s[%d] to an object", path, i) + return errors.Errorf("unable to convert %s[%d] to an object", path, i) } if err := fn(obj); err != nil { return err diff --git a/pkg/util/encode/encode.go b/pkg/util/encode/encode.go index 727a409d8..1e95e9b43 100644 --- a/pkg/util/encode/encode.go +++ b/pkg/util/encode/encode.go @@ -21,6 +21,8 @@ import ( "fmt" "io" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime" "github.com/heptio/ark/pkg/apis/ark/v1" @@ -46,7 +48,7 @@ func EncodeTo(obj runtime.Object, format string, w io.Writer) error { return err } - return encoder.Encode(obj, w) + return errors.WithStack(encoder.Encode(obj, w)) } // EncoderFor gets the appropriate encoder for the specified format. @@ -55,7 +57,7 @@ func EncoderFor(format string) (runtime.Encoder, error) { desiredMediaType := fmt.Sprintf("application/%s", format) serializerInfo, found := runtime.SerializerInfoForMediaType(scheme.Codecs.SupportedMediaTypes(), desiredMediaType) if !found { - return nil, fmt.Errorf("unable to locate an encoder for %q", desiredMediaType) + return nil, errors.Errorf("unable to locate an encoder for %q", desiredMediaType) } if serializerInfo.PrettySerializer != nil { encoder = serializerInfo.PrettySerializer diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index af8bbb699..5a7171cf5 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -17,11 +17,12 @@ limitations under the License. package kube import ( - "errors" "fmt" "regexp" "strings" + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -50,7 +51,7 @@ func EnsureNamespaceExists(namespace *v1.Namespace, client corev1.NamespaceInter } else if apierrors.IsAlreadyExists(err) { return false, nil } else { - return false, err + return false, errors.Wrapf(err, "error creating namespace %s", namespace.Name) } } diff --git a/third_party/kubernetes/pkg/kubectl/cmd/util/shortcut_expander.go b/third_party/kubernetes/pkg/kubectl/cmd/util/shortcut_expander.go index d4b8468fc..b101059da 100644 --- a/third_party/kubernetes/pkg/kubectl/cmd/util/shortcut_expander.go +++ b/third_party/kubernetes/pkg/kubectl/cmd/util/shortcut_expander.go @@ -20,7 +20,7 @@ import ( "errors" "strings" - "github.com/golang/glog" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" @@ -39,11 +39,12 @@ type shortcutExpander struct { RESTMapper meta.RESTMapper discoveryClient discovery.DiscoveryInterface + logger *logrus.Logger } var _ meta.RESTMapper = &shortcutExpander{} -func NewShortcutExpander(delegate meta.RESTMapper, client discovery.DiscoveryInterface) (shortcutExpander, error) { +func NewShortcutExpander(delegate meta.RESTMapper, client discovery.DiscoveryInterface, logger *logrus.Logger) (shortcutExpander, error) { if client == nil { return shortcutExpander{}, errors.New("Please provide discovery client to shortcut expander") } @@ -96,7 +97,7 @@ func (e shortcutExpander) getShortcutMappings() ([]ResourceShortcuts, error) { for _, shortName := range apiRes.ShortNames { gv, err := schema.ParseGroupVersion(apiResources.GroupVersion) if err != nil { - glog.V(1).Infof("Unable to parse groupversion = %s due to = %s", apiResources.GroupVersion, err.Error()) + e.logger.WithError(err).WithField("groupVersion", apiResources.GroupVersion).Error("Unable to parse groupversion") continue } rs := ResourceShortcuts{