From 03dde45c09c6a2ed5f2f6620467d583ca1342a7e Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Mon, 14 Aug 2017 10:14:30 -0400 Subject: [PATCH] Add 'ark backup logs' command for retrieval Signed-off-by: Andy Goldstein --- examples/common/00-prereqs.yaml | 15 + pkg/apis/ark/v1/download_request.go | 82 +++++ pkg/apis/ark/v1/register.go | 2 + .../aws/object_storage_adapter.go | 12 + .../azure/object_storage_adapter.go | 17 ++ pkg/cloudprovider/backup_service.go | 33 +- pkg/cloudprovider/backup_service_test.go | 5 + .../gcp/object_storage_adapter.go | 30 +- pkg/cloudprovider/storage_interfaces.go | 8 +- pkg/cmd/cli/backup/backup.go | 1 + pkg/cmd/cli/backup/logs.go | 55 ++++ pkg/cmd/server/server.go | 39 ++- .../util/downloadrequest/downloadrequest.go | 135 +++++++++ .../downloadrequest/downloadrequest_test.go | 224 ++++++++++++++ pkg/controller/download_request_controller.go | 285 ++++++++++++++++++ pkg/controller/gc_controller_test.go | 10 + pkg/util/test/backup_service.go | 101 +++++++ pkg/util/test/object_storage_adapter.go | 100 ++++++ 18 files changed, 1142 insertions(+), 12 deletions(-) create mode 100644 pkg/apis/ark/v1/download_request.go create mode 100644 pkg/cmd/cli/backup/logs.go create mode 100644 pkg/cmd/util/downloadrequest/downloadrequest.go create mode 100644 pkg/cmd/util/downloadrequest/downloadrequest_test.go create mode 100644 pkg/controller/download_request_controller.go create mode 100644 pkg/util/test/backup_service.go create mode 100644 pkg/util/test/object_storage_adapter.go diff --git a/examples/common/00-prereqs.yaml b/examples/common/00-prereqs.yaml index 1467ddb5b..4bbeda707 100644 --- a/examples/common/00-prereqs.yaml +++ b/examples/common/00-prereqs.yaml @@ -72,6 +72,21 @@ spec: plural: configs kind: Config +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: downloadrequests.ark.heptio.com + labels: + component: ark +spec: + group: ark.heptio.com + version: v1 + scope: Namespaced + names: + plural: downloadrequests + kind: DownloadRequest + --- apiVersion: v1 kind: Namespace diff --git a/pkg/apis/ark/v1/download_request.go b/pkg/apis/ark/v1/download_request.go new file mode 100644 index 000000000..feadaf22e --- /dev/null +++ b/pkg/apis/ark/v1/download_request.go @@ -0,0 +1,82 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// DownloadRequestSpec is the specification for a download request. +type DownloadRequestSpec struct { + // Target is what to download (e.g. logs for a backup). + Target DownloadTarget `json:"target"` +} + +// DownloadTargetKind represents what type of file to download. +type DownloadTargetKind string + +const ( + DownloadTargetKindBackupLog DownloadTargetKind = "BackupLog" +) + +// DownloadTarget is the specification for what kind of file to download, and the name of the +// resource with which it's associated. +type DownloadTarget struct { + // Kind is the type of file to download. + Kind DownloadTargetKind `json:"kind"` + // Name is the name of the kubernetes resource with which the file is associated. + Name string `json:"name"` +} + +// DownloadRequestPhase represents the lifecycle phase of a DownloadRequest. +type DownloadRequestPhase string + +const ( + // DownloadRequestPhaseNew means the DownloadRequest has not been processed by the + // DownloadRequestController yet. + DownloadRequestPhaseNew DownloadRequestPhase = "New" + // DownloadRequestPhaseProcessed means the DownloadRequest has been processed by the + // DownloadRequestController. + DownloadRequestPhaseProcessed DownloadRequestPhase = "Processed" +) + +// DownloadRequestStatus is the current status of a DownloadRequest. +type DownloadRequestStatus struct { + // Phase is the current state of the DownloadRequest. + Phase DownloadRequestPhase `json:"phase"` + // DownloadURL contains the pre-signed URL for the target file. + DownloadURL string `json:"downloadURL"` + // Expiration is when this DownloadRequest expires and can be deleted by the system. + Expiration metav1.Time `json:"expiration"` +} + +// +genclient=true + +// DownloadRequest is a request to download an artifact from backup object storage, such as a backup +// log file. +type DownloadRequest struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + + Spec DownloadRequestSpec `json:"spec"` + Status DownloadRequestStatus `json:"status,omitempty"` +} + +// DownloadRequestList is a list of DownloadRequests. +type DownloadRequestList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []DownloadRequest `json:"items"` +} diff --git a/pkg/apis/ark/v1/register.go b/pkg/apis/ark/v1/register.go index 226435f1a..fab1fd4e5 100644 --- a/pkg/apis/ark/v1/register.go +++ b/pkg/apis/ark/v1/register.go @@ -51,6 +51,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &RestoreList{}, &Config{}, &ConfigList{}, + &DownloadRequest{}, + &DownloadRequestList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/cloudprovider/aws/object_storage_adapter.go b/pkg/cloudprovider/aws/object_storage_adapter.go index 9b7d9ea17..aad00cdeb 100644 --- a/pkg/cloudprovider/aws/object_storage_adapter.go +++ b/pkg/cloudprovider/aws/object_storage_adapter.go @@ -19,10 +19,12 @@ package aws import ( "errors" "io" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/service/s3" + "github.com/golang/glog" "github.com/heptio/ark/pkg/cloudprovider" ) @@ -131,3 +133,13 @@ func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error { return err } + +func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + glog.V(4).Infof("CreateSignedURL: bucket=%s, key=%s, ttl=%d", bucket, key, ttl) + req, _ := op.s3.GetObjectRequest(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + + return req.Presign(ttl) +} diff --git a/pkg/cloudprovider/azure/object_storage_adapter.go b/pkg/cloudprovider/azure/object_storage_adapter.go index e2841c964..baf368346 100644 --- a/pkg/cloudprovider/azure/object_storage_adapter.go +++ b/pkg/cloudprovider/azure/object_storage_adapter.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "strings" + "time" "github.com/Azure/azure-sdk-for-go/storage" @@ -134,6 +135,22 @@ func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error { return blob.Delete(nil) } +const sasURIReadPermission = "r" + +func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + container, err := getContainerReference(op.blobClient, bucket) + if err != nil { + return "", err + } + + blob, err := getBlobReference(container, key) + if err != nil { + return "", err + } + + return blob.GetSASURI(time.Now().Add(ttl), sasURIReadPermission) +} + func getContainerReference(blobClient *storage.BlobStorageClient, bucket string) (*storage.Container, error) { container := blobClient.GetContainerReference(bucket) if container == nil { diff --git a/pkg/cloudprovider/backup_service.go b/pkg/cloudprovider/backup_service.go index a14565057..be4290bed 100644 --- a/pkg/cloudprovider/backup_service.go +++ b/pkg/cloudprovider/backup_service.go @@ -53,6 +53,10 @@ type BackupService interface { // GetBackup gets the specified api.Backup from the given bucket in object storage. GetBackup(bucket, name string) (*api.Backup, error) + + // CreateBackupLogSignedURL creates a pre-signed URL that can be used to download a backup's log + // file from object storage. The URL expires after ttl. + CreateBackupLogSignedURL(bucket, backupName string, ttl time.Duration) (string, error) } // BackupGetter knows how to list backups in object storage. @@ -67,6 +71,18 @@ const ( logFileFormatString = "%s/%s.log.gz" ) +func getMetadataKey(backup string) string { + return fmt.Sprintf(metadataFileFormatString, backup) +} + +func getBackupKey(backup string) string { + return fmt.Sprintf(backupFileFormatString, backup, backup) +} + +func getBackupLogKey(backup string) string { + return fmt.Sprintf(logFileFormatString, backup, backup) +} + type backupService struct { objectStorage ObjectStorageAdapter decoder runtime.Decoder @@ -85,15 +101,14 @@ func NewBackupService(objectStorage ObjectStorageAdapter) BackupService { func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.ReadSeeker) error { // upload metadata file - metadataKey := fmt.Sprintf(metadataFileFormatString, backupName) + metadataKey := getMetadataKey(backupName) if err := br.objectStorage.PutObject(bucket, metadataKey, metadata); err != nil { // failure to upload metadata file is a hard-stop return err } // upload tar file - backupKey := fmt.Sprintf(backupFileFormatString, backupName, backupName) - if err := br.objectStorage.PutObject(bucket, backupKey, backup); err != nil { + if err := br.objectStorage.PutObject(bucket, getBackupKey(backupName), backup); err != nil { // try to delete the metadata file since the data upload failed deleteErr := br.objectStorage.DeleteObject(bucket, metadataKey) @@ -102,7 +117,7 @@ func (br *backupService) UploadBackup(bucket, backupName string, metadata, backu // uploading log file is best-effort; if it fails, we log the error but call the overall upload a // success - logKey := fmt.Sprintf(logFileFormatString, backupName, backupName) + logKey := getBackupLogKey(backupName) if err := br.objectStorage.PutObject(bucket, logKey, log); err != nil { glog.Errorf("error uploading %s/%s: %v", bucket, logKey, err) } @@ -111,7 +126,7 @@ func (br *backupService) UploadBackup(bucket, backupName string, metadata, backu } func (br *backupService) DownloadBackup(bucket, backupName string) (io.ReadCloser, error) { - return br.objectStorage.GetObject(bucket, fmt.Sprintf(backupFileFormatString, backupName, backupName)) + return br.objectStorage.GetObject(bucket, getBackupKey(backupName)) } func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { @@ -166,17 +181,21 @@ func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) { } func (br *backupService) DeleteBackupFile(bucket, backupName string) error { - key := fmt.Sprintf(backupFileFormatString, backupName, backupName) + key := getBackupKey(backupName) glog.V(4).Infof("Trying to delete bucket=%s, key=%s", bucket, key) return br.objectStorage.DeleteObject(bucket, key) } func (br *backupService) DeleteBackupMetadataFile(bucket, backupName string) error { - key := fmt.Sprintf(metadataFileFormatString, backupName) + key := getMetadataKey(backupName) glog.V(4).Infof("Trying to delete bucket=%s, key=%s", bucket, key) return br.objectStorage.DeleteObject(bucket, key) } +func (br *backupService) CreateBackupLogSignedURL(bucket, backupName string, ttl time.Duration) (string, error) { + return br.objectStorage.CreateSignedURL(bucket, getBackupLogKey(backupName), ttl) +} + // cachedBackupService wraps a real backup service with a cache for getting cloud backups. type cachedBackupService struct { BackupService diff --git a/pkg/cloudprovider/backup_service_test.go b/pkg/cloudprovider/backup_service_test.go index d7199e57c..37645e4fc 100644 --- a/pkg/cloudprovider/backup_service_test.go +++ b/pkg/cloudprovider/backup_service_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "strings" "testing" + "time" "github.com/stretchr/testify/assert" @@ -465,3 +466,7 @@ func (os *fakeObjectStorage) DeleteObject(bucket string, key string) error { return nil } + +func (os *fakeObjectStorage) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + panic("not implemented") +} diff --git a/pkg/cloudprovider/gcp/object_storage_adapter.go b/pkg/cloudprovider/gcp/object_storage_adapter.go index 55ad3b3ae..9542180b4 100644 --- a/pkg/cloudprovider/gcp/object_storage_adapter.go +++ b/pkg/cloudprovider/gcp/object_storage_adapter.go @@ -17,23 +17,29 @@ limitations under the License. package gcp import ( + "errors" "io" "strings" + "time" "golang.org/x/oauth2" "golang.org/x/oauth2/google" + // TODO switch to using newstorage + newstorage "cloud.google.com/go/storage" storage "google.golang.org/api/storage/v1" "github.com/heptio/ark/pkg/cloudprovider" ) type objectStorageAdapter struct { - gcs *storage.Service + gcs *storage.Service + googleAccessID string + privateKey []byte } var _ cloudprovider.ObjectStorageAdapter = &objectStorageAdapter{} -func NewObjectStorageAdapter() (cloudprovider.ObjectStorageAdapter, error) { +func NewObjectStorageAdapter(googleAccessID string, privateKey []byte) (cloudprovider.ObjectStorageAdapter, error) { client, err := google.DefaultClient(oauth2.NoContext, storage.DevstorageReadWriteScope) if err != nil { return nil, err @@ -45,7 +51,9 @@ func NewObjectStorageAdapter() (cloudprovider.ObjectStorageAdapter, error) { } return &objectStorageAdapter{ - gcs: gcs, + gcs: gcs, + googleAccessID: googleAccessID, + privateKey: privateKey, }, nil } @@ -87,3 +95,19 @@ func (op *objectStorageAdapter) ListCommonPrefixes(bucket string, delimiter stri func (op *objectStorageAdapter) DeleteObject(bucket string, key string) error { return op.gcs.Objects.Delete(bucket, key).Do() } + +func (op *objectStorageAdapter) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + if op.googleAccessID == "" { + return "", errors.New("unable to create a pre-signed URL - make sure GOOGLE_APPLICATION_CREDENTIALS points to a valid GCE service account file (missing email address)") + } + if len(op.privateKey) == 0 { + return "", errors.New("unable to create a pre-signed URL - make sure GOOGLE_APPLICATION_CREDENTIALS points to a valid GCE service account file (missing private key)") + } + + return newstorage.SignedURL(bucket, key, &newstorage.SignedURLOptions{ + GoogleAccessID: op.googleAccessID, + PrivateKey: op.privateKey, + Method: "GET", + Expires: time.Now().Add(ttl), + }) +} diff --git a/pkg/cloudprovider/storage_interfaces.go b/pkg/cloudprovider/storage_interfaces.go index 699e9c3a1..935f7aa13 100644 --- a/pkg/cloudprovider/storage_interfaces.go +++ b/pkg/cloudprovider/storage_interfaces.go @@ -16,7 +16,10 @@ limitations under the License. package cloudprovider -import "io" +import ( + "io" + "time" +) // ObjectStorageAdapter exposes basic object-storage operations required // by Ark. @@ -37,6 +40,9 @@ type ObjectStorageAdapter interface { // DeleteObject removes object with the specified key from the given // bucket. DeleteObject(bucket string, key string) error + + // CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. + CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) } // BlockStorageAdapter exposes basic block-storage operations required diff --git a/pkg/cmd/cli/backup/backup.go b/pkg/cmd/cli/backup/backup.go index 3b7604482..96ece4f17 100644 --- a/pkg/cmd/cli/backup/backup.go +++ b/pkg/cmd/cli/backup/backup.go @@ -32,6 +32,7 @@ func NewCommand(f client.Factory) *cobra.Command { c.AddCommand( NewCreateCommand(f), NewGetCommand(f), + NewLogsCommand(f), // Will implement describe later // NewDescribeCommand(f), diff --git a/pkg/cmd/cli/backup/logs.go b/pkg/cmd/cli/backup/logs.go new file mode 100644 index 000000000..07ccd2d67 --- /dev/null +++ b/pkg/cmd/cli/backup/logs.go @@ -0,0 +1,55 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package backup + +import ( + "errors" + "os" + "time" + + "github.com/spf13/cobra" + + "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cmd" + "github.com/heptio/ark/pkg/cmd/util/downloadrequest" +) + +func NewLogsCommand(f client.Factory) *cobra.Command { + timeout := time.Minute + + c := &cobra.Command{ + Use: "logs BACKUP", + Short: "Get backup logs", + Run: func(c *cobra.Command, args []string) { + if len(args) != 1 { + err := errors.New("backup name is required") + cmd.CheckError(err) + } + + arkClient, err := f.Client() + cmd.CheckError(err) + + err = downloadrequest.Stream(arkClient.ArkV1(), args[0], v1.DownloadTargetKindBackupLog, os.Stdout, timeout) + cmd.CheckError(err) + }, + } + + c.Flags().DurationVar(&timeout, "timeout", timeout, "how long to wait to receive logs") + + return c +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index ec75152f0..3f46041cb 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -19,10 +19,14 @@ package server import ( "context" "fmt" + "io/ioutil" + "os" "reflect" "sync" "time" + "golang.org/x/oauth2/google" + "github.com/golang/glog" "github.com/spf13/cobra" @@ -305,7 +309,27 @@ func getObjectStorageProvider(cloudConfig api.CloudProviderConfig, field string) cloudConfig.AWS.KMSKeyID, cloudConfig.AWS.S3ForcePathStyle) case cloudConfig.GCP != nil: - objectStorage, err = gcp.NewObjectStorageAdapter() + var email string + var privateKey []byte + + credentialsFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") + if credentialsFile != "" { + // Get the email and private key from the credentials file so we can pre-sign download URLs + creds, err := ioutil.ReadFile(credentialsFile) + if err != nil { + return nil, err + } + jwtConfig, err := google.JWTConfigFromJSON(creds) + if err != nil { + return nil, err + } + email = jwtConfig.Email + privateKey = jwtConfig.PrivateKey + } else { + glog.Warning("GOOGLE_APPLICATION_CREDENTIALS is undefined; some features such as downloading log files will not work") + } + + objectStorage, err = gcp.NewObjectStorageAdapter(email, privateKey) case cloudConfig.Azure != nil: objectStorage, err = azure.NewObjectStorageAdapter() } @@ -465,6 +489,19 @@ func (s *server) runControllers(config *api.Config) error { wg.Done() }() + downloadRequestController := controller.NewDownloadRequestController( + s.arkClient.ArkV1(), + s.sharedInformerFactory.Ark().V1().DownloadRequests(), + s.sharedInformerFactory.Ark().V1().Backups(), + s.backupService, + config.BackupStorageProvider.Bucket, + ) + wg.Add(1) + go func() { + downloadRequestController.Run(ctx, 1) + wg.Done() + }() + // SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS go s.sharedInformerFactory.Start(ctx.Done()) diff --git a/pkg/cmd/util/downloadrequest/downloadrequest.go b/pkg/cmd/util/downloadrequest/downloadrequest.go new file mode 100644 index 000000000..cfc1b61a5 --- /dev/null +++ b/pkg/cmd/util/downloadrequest/downloadrequest.go @@ -0,0 +1,135 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package downloadrequest + +import ( + "compress/gzip" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + "github.com/heptio/ark/pkg/apis/ark/v1" + arkclientv1 "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1" +) + +func Stream(client arkclientv1.DownloadRequestsGetter, name string, kind v1.DownloadTargetKind, w io.Writer, timeout time.Duration) error { + req := &v1.DownloadRequest{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: v1.DefaultNamespace, + Name: fmt.Sprintf("%s-%s", name, time.Now().Format("20060102150405")), + }, + Spec: v1.DownloadRequestSpec{ + Target: v1.DownloadTarget{ + Kind: kind, + Name: name, + }, + }, + } + + req, err := client.DownloadRequests(v1.DefaultNamespace).Create(req) + if err != nil { + return err + } + defer client.DownloadRequests(v1.DefaultNamespace).Delete(req.Name, nil) + + listOptions := metav1.ListOptions{ + //TODO: once kube-apiserver http://issue.k8s.io/51046 is fixed, uncomment + //FieldSelector: "metadata.name=" + req.Name + ResourceVersion: req.ResourceVersion, + } + watcher, err := client.DownloadRequests(v1.DefaultNamespace).Watch(listOptions) + if err != nil { + return err + } + defer watcher.Stop() + + expired := time.NewTimer(timeout) + defer expired.Stop() + +Loop: + for { + select { + case <-expired.C: + return errors.New("timed out waiting for download URL") + case e := <-watcher.ResultChan(): + updated, ok := e.Object.(*v1.DownloadRequest) + if !ok { + return fmt.Errorf("unexpected type %T", e.Object) + } + + if updated.Name != req.Name { + continue + } + + switch e.Type { + case watch.Deleted: + errors.New("download request was unexpectedly deleted") + case watch.Modified: + if updated.Status.DownloadURL != "" { + req = updated + break Loop + } + } + } + } + + if req.Status.DownloadURL == "" { + return errors.New("file not found") + } + + httpClient := new(http.Client) + + httpReq, err := http.NewRequest("GET", req.Status.DownloadURL, nil) + if err != nil { + return err + } + + // Manually set this header so the net/http library does not automatically try to decompress. We + // need to handle this manually because it's not currently possible to set the MIME type for the + // pre-signed URLs for GCP or Azure. + httpReq.Header.Set("Accept-Encoding", "gzip") + + resp, err := httpClient.Do(httpReq) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("request failed; unable to decode response body: %v", err) + } + + return fmt.Errorf("request failed: %v", string(body)) + } + + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + return err + } + defer gzipReader.Close() + + _, err = io.Copy(w, gzipReader) + return err +} diff --git a/pkg/cmd/util/downloadrequest/downloadrequest_test.go b/pkg/cmd/util/downloadrequest/downloadrequest_test.go new file mode 100644 index 000000000..3b89617a1 --- /dev/null +++ b/pkg/cmd/util/downloadrequest/downloadrequest_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package downloadrequest + +import ( + "bytes" + "compress/gzip" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/generated/clientset/fake" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + core "k8s.io/client-go/testing" +) + +func TestStream(t *testing.T) { + tests := []struct { + name string + kind v1.DownloadTargetKind + timeout time.Duration + createError error + watchError error + watchAdds []runtime.Object + watchModifies []runtime.Object + watchDeletes []runtime.Object + updateWithURL bool + statusCode int + body string + deleteError error + expectedError string + }{ + { + name: "error creating req", + createError: errors.New("foo"), + kind: v1.DownloadTargetKindBackupLog, + expectedError: "foo", + }, + { + name: "error creating watch", + watchError: errors.New("bar"), + kind: v1.DownloadTargetKindBackupLog, + expectedError: "bar", + }, + { + name: "timed out", + kind: v1.DownloadTargetKindBackupLog, + timeout: time.Millisecond, + expectedError: "timed out waiting for download URL", + }, + { + name: "unexpected watch type", + kind: v1.DownloadTargetKindBackupLog, + watchAdds: []runtime.Object{&v1.Backup{}}, + expectedError: "unexpected type *v1.Backup", + }, + { + name: "other requests added/updated/deleted first", + kind: v1.DownloadTargetKindBackupLog, + watchAdds: []runtime.Object{ + newDownloadRequest("foo").DownloadRequest, + }, + watchModifies: []runtime.Object{ + newDownloadRequest("foo").DownloadRequest, + }, + watchDeletes: []runtime.Object{ + newDownloadRequest("foo").DownloadRequest, + }, + updateWithURL: true, + statusCode: http.StatusOK, + body: "download body", + }, + { + name: "http error", + kind: v1.DownloadTargetKindBackupLog, + updateWithURL: true, + statusCode: http.StatusInternalServerError, + body: "some error", + expectedError: "request failed: some error", + }, + } + + const testTimeout = 30 * time.Second + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset() + + created := make(chan *v1.DownloadRequest, 1) + client.PrependReactor("create", "downloadrequests", func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + created <- createAction.GetObject().(*v1.DownloadRequest) + return true, createAction.GetObject(), test.createError + }) + + fakeWatch := watch.NewFake() + client.PrependWatchReactor("downloadrequests", core.DefaultWatchReactor(fakeWatch, test.watchError)) + + deleted := make(chan string, 1) + client.PrependReactor("delete", "downloadrequests", func(action core.Action) (bool, runtime.Object, error) { + deleteAction := action.(core.DeleteAction) + deleted <- deleteAction.GetName() + return true, nil, test.deleteError + }) + + timeout := test.timeout + if timeout == 0 { + timeout = testTimeout + } + + var server *httptest.Server + var url string + if test.updateWithURL { + server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(test.statusCode) + if test.statusCode == http.StatusOK { + gzipWriter := gzip.NewWriter(w) + fmt.Fprintf(gzipWriter, test.body) + gzipWriter.Close() + return + } + fmt.Fprintf(w, test.body) + })) + defer server.Close() + url = server.URL + } + + output := new(bytes.Buffer) + errCh := make(chan error) + go func() { + err := Stream(client.ArkV1(), "name", test.kind, output, timeout) + errCh <- err + }() + + for i := range test.watchAdds { + fakeWatch.Add(test.watchAdds[i]) + } + for i := range test.watchModifies { + fakeWatch.Modify(test.watchModifies[i]) + } + for i := range test.watchDeletes { + fakeWatch.Delete(test.watchDeletes[i]) + } + + var createdName string + if test.updateWithURL { + select { + case r := <-created: + createdName = r.Name + r.Status.DownloadURL = url + fakeWatch.Modify(r) + case <-time.After(testTimeout): + t.Fatalf("created object not received") + } + + } + + var err error + select { + case err = <-errCh: + case <-time.After(testTimeout): + t.Fatal("test timed out") + } + + if test.expectedError != "" { + require.EqualError(t, err, test.expectedError) + return + } + + require.NoError(t, err) + + if test.statusCode != http.StatusOK { + assert.EqualError(t, err, "request failed: "+test.body) + return + } + + assert.Equal(t, test.body, output.String()) + + select { + case name := <-deleted: + assert.Equal(t, createdName, name) + default: + t.Fatal("download request was not deleted") + } + }) + } +} + +type downloadRequest struct { + *v1.DownloadRequest +} + +func newDownloadRequest(name string) *downloadRequest { + return &downloadRequest{ + DownloadRequest: &v1.DownloadRequest{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: v1.DefaultNamespace, + Name: name, + }, + }, + } +} diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go new file mode 100644 index 000000000..0f798ba58 --- /dev/null +++ b/pkg/controller/download_request_controller.go @@ -0,0 +1,285 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/golang/glog" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/cloudprovider" + "github.com/heptio/ark/pkg/generated/clientset/scheme" + arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1" + informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" + listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" +) + +type downloadRequestController struct { + downloadRequestClient arkv1client.DownloadRequestsGetter + downloadRequestLister listers.DownloadRequestLister + downloadRequestListerSynced cache.InformerSynced + backupLister listers.BackupLister + backupListerSynced cache.InformerSynced + + backupService cloudprovider.BackupService + bucket string + + syncHandler func(key string) error + queue workqueue.RateLimitingInterface + + clock clock.Clock +} + +// NewDownloadRequestController creates a new DownloadRequestController. +func NewDownloadRequestController( + downloadRequestClient arkv1client.DownloadRequestsGetter, + downloadRequestInformer informers.DownloadRequestInformer, + backupInformer informers.BackupInformer, + backupService cloudprovider.BackupService, + bucket string, +) Interface { + c := &downloadRequestController{ + downloadRequestClient: downloadRequestClient, + downloadRequestLister: downloadRequestInformer.Lister(), + downloadRequestListerSynced: downloadRequestInformer.Informer().HasSynced, + backupLister: backupInformer.Lister(), + backupListerSynced: backupInformer.Informer().HasSynced, + + backupService: backupService, + bucket: bucket, + + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), + + clock: &clock.RealClock{}, + } + + c.syncHandler = c.processDownloadRequest + + downloadRequestInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("error creating queue key for %#v: %v", obj, err)) + return + } + c.queue.Add(key) + }, + }, + ) + + return c +} + +// Run is a blocking function that runs the specified number of worker goroutines +// to process items in the work queue. It will return when it receives on the +// ctx.Done() channel. +func (c *downloadRequestController) Run(ctx context.Context, numWorkers int) error { + var wg sync.WaitGroup + + defer func() { + glog.Infof("Waiting for workers to finish their work") + + c.queue.ShutDown() + + // We have to wait here in the deferred function instead of at the bottom of the function body + // because we have to shut down the queue in order for the workers to shut down gracefully, and + // we want to shut down the queue via defer and not at the end of the body. + wg.Wait() + + glog.Infof("All workers have finished") + }() + + glog.Info("Starting DownloadRequestController") + defer glog.Infof("Shutting down DownloadRequestController") + + glog.Info("Waiting for caches to sync") + if !cache.WaitForCacheSync(ctx.Done(), c.downloadRequestListerSynced, c.backupListerSynced) { + return errors.New("timed out waiting for caches to sync") + } + glog.Info("Caches are synced") + + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func() { + wait.Until(c.runWorker, time.Second, ctx.Done()) + wg.Done() + }() + } + + wg.Add(1) + go func() { + wait.Until(c.resync, time.Minute, ctx.Done()) + wg.Done() + }() + + <-ctx.Done() + + return nil +} + +// runWorker runs a worker until the controller's queue indicates it's time to shut down. +func (c *downloadRequestController) runWorker() { + // continually take items off the queue (waits if it's + // empty) until we get a shutdown signal from the queue + for c.processNextWorkItem() { + } +} + +// processNextWorkItem processes a single item from the queue. +func (c *downloadRequestController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + // always call done on this item, since if it fails we'll add + // it back with rate-limiting below + defer c.queue.Done(key) + + err := c.syncHandler(key.(string)) + if err == nil { + // If you had no error, tell the queue to stop tracking history for your key. This will reset + // things like failure counts for per-item rate limiting. + c.queue.Forget(key) + return true + } + + glog.Errorf("syncHandler error: %v", err) + // we had an error processing the item so add it back + // into the queue for re-processing with rate-limiting + c.queue.AddRateLimited(key) + + return true +} + +// processDownloadRequest is the default per-item sync handler. It generates a pre-signed URL for +// a new DownloadRequest or deletes the DownloadRequest if it has expired. +func (c *downloadRequestController) processDownloadRequest(key string) error { + glog.V(4).Infof("processDownloadRequest for key %q", key) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + glog.V(4).Infof("error splitting key %q: %v", key, err) + return err + } + + downloadRequest, err := c.downloadRequestLister.DownloadRequests(ns).Get(name) + if apierrors.IsNotFound(err) { + glog.V(4).Infof("unable to find DownloadRequest %q", key) + return nil + } + if err != nil { + glog.V(4).Infof("error getting DownloadRequest %q: %v", key, err) + return err + } + + switch downloadRequest.Status.Phase { + case "", v1.DownloadRequestPhaseNew: + return c.generatePreSignedURL(downloadRequest) + case v1.DownloadRequestPhaseProcessed: + return c.deleteIfExpired(downloadRequest) + } + + return nil +} + +const signedURLTTL = 10 * time.Minute + +// generatePreSignedURL generates a pre-signed URL for downloadRequest, changes the phase to +// Processed, and persists the changes to storage. +func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.DownloadRequest) error { + switch downloadRequest.Spec.Target.Kind { + case v1.DownloadTargetKindBackupLog: + update, err := cloneDownloadRequest(downloadRequest) + if err != nil { + return err + } + + update.Status.DownloadURL, err = c.backupService.CreateBackupLogSignedURL(c.bucket, update.Spec.Target.Name, signedURLTTL) + if err != nil { + return err + } + + update.Status.Phase = v1.DownloadRequestPhaseProcessed + update.Status.Expiration = metav1.NewTime(c.clock.Now().Add(signedURLTTL)) + + _, err = c.downloadRequestClient.DownloadRequests(update.Namespace).Update(update) + return err + } + + return fmt.Errorf("unsupported download target kind %q", downloadRequest.Spec.Target.Kind) +} + +// deleteIfExpired deletes downloadRequest if it has expired. +func (c *downloadRequestController) deleteIfExpired(downloadRequest *v1.DownloadRequest) error { + glog.V(4).Infof("checking for expiration of %s/%s", downloadRequest.Namespace, downloadRequest.Name) + if downloadRequest.Status.Expiration.Time.Before(c.clock.Now()) { + glog.V(4).Infof("%s/%s has not expired", downloadRequest.Namespace, downloadRequest.Name) + return nil + } + + glog.V(4).Infof("%s/%s has expired - deleting", downloadRequest.Namespace, downloadRequest.Name) + return c.downloadRequestClient.DownloadRequests(downloadRequest.Namespace).Delete(downloadRequest.Name, nil) +} + +// resync requeues all the DownloadRequests in the lister's cache. This is mostly to handle deleting +// any expired requests that were not deleted as part of the normal client flow for whatever reason. +func (c *downloadRequestController) resync() { + list, err := c.downloadRequestLister.List(labels.Everything()) + if err != nil { + runtime.HandleError(fmt.Errorf("error listing download requests: %v", err)) + return + } + + for _, dr := range list { + key, err := cache.MetaNamespaceKeyFunc(dr) + if err != nil { + runtime.HandleError(fmt.Errorf("error generating key for download request %#v: %v", dr, err)) + continue + } + + c.queue.Add(key) + } +} + +// cloneDownloadRequest makes a deep copy of in. +func cloneDownloadRequest(in *v1.DownloadRequest) (*v1.DownloadRequest, error) { + clone, err := scheme.Scheme.DeepCopy(in) + if err != nil { + return nil, err + } + + out, ok := clone.(*v1.DownloadRequest) + if !ok { + return nil, fmt.Errorf("unexpected type: %T", clone) + } + + return out, nil +} diff --git a/pkg/controller/gc_controller_test.go b/pkg/controller/gc_controller_test.go index 4975d8c70..3d7277396 100644 --- a/pkg/controller/gc_controller_test.go +++ b/pkg/controller/gc_controller_test.go @@ -19,6 +19,7 @@ package controller import ( "bytes" "errors" + "fmt" "io" "io/ioutil" "testing" @@ -516,6 +517,7 @@ func TestGarbageCollectPicksUpBackupUponExpiration(t *testing.T) { assert.Equal(0, len(snapshotService.SnapshotsTaken), "snapshots should have been garbage-collected.") } +// TODO remove this and use util/test mock instead type fakeBackupService struct { backupMetadataByBucket map[string][]*api.Backup backupsByBucket map[string][]*api.Backup @@ -554,6 +556,10 @@ func (s *fakeBackupService) DownloadBackup(bucket, name string) (io.ReadCloser, return ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil } +func (s *fakeBackupService) DownloadBackupLogs(bucket, name string) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader([]byte("hello world in a log"))), nil +} + func (s *fakeBackupService) DeleteBackupMetadataFile(bucket, backupName string) error { backups, found := s.backupMetadataByBucket[bucket] if !found { @@ -599,3 +605,7 @@ func (s *fakeBackupService) DeleteBackupFile(bucket, backupName string) error { return nil } + +func (s *fakeBackupService) CreateBackupLogSignedURL(bucket, backupName string, ttl time.Duration) (string, error) { + return fmt.Sprintf("http://some.server/%s/%s/%d", bucket, backupName, ttl), nil +} diff --git a/pkg/util/test/backup_service.go b/pkg/util/test/backup_service.go new file mode 100644 index 000000000..be85eb747 --- /dev/null +++ b/pkg/util/test/backup_service.go @@ -0,0 +1,101 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by mockery v1.0.0 +package test + +import io "io" +import mock "github.com/stretchr/testify/mock" +import v1 "github.com/heptio/ark/pkg/apis/ark/v1" + +// BackupService is an autogenerated mock type for the BackupService type +type BackupService struct { + mock.Mock +} + +// DeleteBackup provides a mock function with given fields: bucket, backupName +func (_m *BackupService) DeleteBackup(bucket string, backupName string) error { + ret := _m.Called(bucket, backupName) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(bucket, backupName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DownloadBackup provides a mock function with given fields: bucket, name +func (_m *BackupService) DownloadBackup(bucket string, name string) (io.ReadCloser, error) { + ret := _m.Called(bucket, name) + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func(string, string) io.ReadCloser); ok { + r0 = rf(bucket, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string) error); ok { + r1 = rf(bucket, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetAllBackups provides a mock function with given fields: bucket +func (_m *BackupService) GetAllBackups(bucket string) ([]*v1.Backup, error) { + ret := _m.Called(bucket) + + var r0 []*v1.Backup + if rf, ok := ret.Get(0).(func(string) []*v1.Backup); ok { + r0 = rf(bucket) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*v1.Backup) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(bucket) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UploadBackup provides a mock function with given fields: bucket, name, metadata, backup, log +func (_m *BackupService) UploadBackup(bucket string, name string, metadata io.ReadSeeker, backup io.ReadSeeker, log io.ReadSeeker) error { + ret := _m.Called(bucket, name, metadata, backup, log) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, io.ReadSeeker, io.ReadSeeker, io.ReadSeeker) error); ok { + r0 = rf(bucket, name, metadata, backup, log) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/util/test/object_storage_adapter.go b/pkg/util/test/object_storage_adapter.go new file mode 100644 index 000000000..70e1547b5 --- /dev/null +++ b/pkg/util/test/object_storage_adapter.go @@ -0,0 +1,100 @@ +/* +Copyright 2017 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by mockery v1.0.0 +package test + +import io "io" +import mock "github.com/stretchr/testify/mock" + +// ObjectStorageAdapter is an autogenerated mock type for the ObjectStorageAdapter type +type ObjectStorageAdapter struct { + mock.Mock +} + +// DeleteObject provides a mock function with given fields: bucket, key +func (_m *ObjectStorageAdapter) DeleteObject(bucket string, key string) error { + ret := _m.Called(bucket, key) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(bucket, key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetObject provides a mock function with given fields: bucket, key +func (_m *ObjectStorageAdapter) GetObject(bucket string, key string) (io.ReadCloser, error) { + ret := _m.Called(bucket, key) + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func(string, string) io.ReadCloser); ok { + r0 = rf(bucket, key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string) error); ok { + r1 = rf(bucket, key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ListCommonPrefixes provides a mock function with given fields: bucket, delimiter +func (_m *ObjectStorageAdapter) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) { + ret := _m.Called(bucket, delimiter) + + var r0 []string + if rf, ok := ret.Get(0).(func(string, string) []string); ok { + r0 = rf(bucket, delimiter) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string) error); ok { + r1 = rf(bucket, delimiter) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PutObject provides a mock function with given fields: bucket, key, body +func (_m *ObjectStorageAdapter) PutObject(bucket string, key string, body io.ReadSeeker) error { + ret := _m.Called(bucket, key, body) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, io.ReadSeeker) error); ok { + r0 = rf(bucket, key, body) + } else { + r0 = ret.Error(0) + } + + return r0 +}