diff --git a/pkg/plugin/clientmgmt/client_builder.go b/pkg/plugin/clientmgmt/client_builder.go index 769487417..3432e4110 100644 --- a/pkg/plugin/clientmgmt/client_builder.go +++ b/pkg/plugin/clientmgmt/client_builder.go @@ -1,5 +1,5 @@ /* -Copyright 2020 the Velero contributors. +Copyright 2021 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/plugin/clientmgmt/restartable_adapted_v1_backup_item_action.go b/pkg/plugin/clientmgmt/restartable_adapted_v1_backup_item_action.go index 06cf597d9..258f93450 100644 --- a/pkg/plugin/clientmgmt/restartable_adapted_v1_backup_item_action.go +++ b/pkg/plugin/clientmgmt/restartable_adapted_v1_backup_item_action.go @@ -1,5 +1,5 @@ /* -Copyright 2018, 2021 the Velero contributors. +Copyright 2021 the Velero contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -44,8 +44,8 @@ func newAdaptedV1BackupItemAction( return r } -// getBackupItemAction returns the backup item action for this restartableAdaptedV1BackupItemAction. It does *not* restart the -// plugin process. +// getBackupItemAction returns the backup item action for this restartableAdaptedV1BackupItemAction. +// It does *not* restart the plugin process. func (r *restartableAdaptedV1BackupItemAction) getBackupItemAction() (backupitemactionv1.BackupItemAction, error) { plugin, err := r.sharedPluginProcess.getByKindAndName(r.key) if err != nil { @@ -60,7 +60,8 @@ func (r *restartableAdaptedV1BackupItemAction) getBackupItemAction() (backupitem return backupItemAction, nil } -// getDelegate restarts the plugin process (if needed) and returns the backup item action for this restartableAdaptedV1BackupItemAction. +// getDelegate restarts the plugin process (if needed) and returns the backup item +// action for this restartableAdaptedV1BackupItemAction. func (r *restartableAdaptedV1BackupItemAction) getDelegate() (backupitemactionv1.BackupItemAction, error) { if err := r.sharedPluginProcess.resetIfNeeded(); err != nil { return nil, err @@ -80,7 +81,8 @@ func (r *restartableAdaptedV1BackupItemAction) AppliesTo() (velero.ResourceSelec } // Execute restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1BackupItemAction) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) { +func (r *restartableAdaptedV1BackupItemAction) Execute( + item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) { delegate, err := r.getDelegate() if err != nil { return nil, nil, err @@ -91,7 +93,10 @@ func (r *restartableAdaptedV1BackupItemAction) Execute(item runtime.Unstructured // Version 2: simply discard ctx and call version 1 function. // ExecuteV2 restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1BackupItemAction) ExecuteV2(ctx context.Context, item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) { +func (r *restartableAdaptedV1BackupItemAction) ExecuteV2( + ctx context.Context, item runtime.Unstructured, backup *api.Backup) ( + runtime.Unstructured, []velero.ResourceIdentifier, error) { + delegate, err := r.getDelegate() if err != nil { return nil, nil, err diff --git a/pkg/plugin/clientmgmt/restartable_adapted_v1_object_store.go b/pkg/plugin/clientmgmt/restartable_adapted_v1_object_store.go index 7aa10d524..3e9c820a1 100644 --- a/pkg/plugin/clientmgmt/restartable_adapted_v1_object_store.go +++ b/pkg/plugin/clientmgmt/restartable_adapted_v1_object_store.go @@ -58,8 +58,8 @@ func (r *restartableAdaptedV1ObjectStore) reinitialize(dispensed interface{}) er return r.init(objectStore, r.config) } -// getObjectStore returns the object store for this restartableObjectStore. It does *not* restart the -// plugin process. +// getObjectStore returns the object store for this restartableObjectStore. +// It does *not* restart the plugin process. func (r *restartableAdaptedV1ObjectStore) getObjectStore() (objectstorev1.ObjectStore, error) { plugin, err := r.sharedPluginProcess.getByKindAndName(r.key) if err != nil { @@ -139,7 +139,8 @@ func (r *restartableAdaptedV1ObjectStore) GetObject(bucket string, key string) ( } // ListCommonPrefixes restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1ObjectStore) ListCommonPrefixes(bucket string, prefix string, delimiter string) ([]string, error) { +func (r *restartableAdaptedV1ObjectStore) ListCommonPrefixes( + bucket string, prefix string, delimiter string) ([]string, error) { delegate, err := r.getDelegate() if err != nil { return nil, err @@ -166,7 +167,8 @@ func (r *restartableAdaptedV1ObjectStore) DeleteObject(bucket string, key string } // CreateSignedURL restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1ObjectStore) CreateSignedURL(bucket string, key string, ttl time.Duration) (string, error) { +func (r *restartableAdaptedV1ObjectStore) CreateSignedURL( + bucket string, key string, ttl time.Duration) (string, error) { delegate, err := r.getDelegate() if err != nil { return "", err @@ -176,7 +178,8 @@ func (r *restartableAdaptedV1ObjectStore) CreateSignedURL(bucket string, key str // Version 2. Simply discard ctx. // PutObjectV2 restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1ObjectStore) PutObjectV2(ctx context.Context, bucket string, key string, body io.Reader) error { +func (r *restartableAdaptedV1ObjectStore) PutObjectV2( + ctx context.Context, bucket string, key string, body io.Reader) error { delegate, err := r.getDelegate() if err != nil { return err @@ -194,7 +197,8 @@ func (r *restartableAdaptedV1ObjectStore) ObjectExistsV2(ctx context.Context, bu } // GetObjectV2 restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1ObjectStore) GetObjectV2(ctx context.Context, bucket string, key string) (io.ReadCloser, error) { +func (r *restartableAdaptedV1ObjectStore) GetObjectV2( + ctx context.Context, bucket string, key string) (io.ReadCloser, error) { delegate, err := r.getDelegate() if err != nil { return nil, err @@ -213,7 +217,8 @@ func (r *restartableAdaptedV1ObjectStore) ListCommonPrefixesV2( } // ListObjectsV2 restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1ObjectStore) ListObjectsV2(ctx context.Context, bucket string, prefix string) ([]string, error) { +func (r *restartableAdaptedV1ObjectStore) ListObjectsV2( + ctx context.Context, bucket string, prefix string) ([]string, error) { delegate, err := r.getDelegate() if err != nil { return nil, err @@ -231,7 +236,8 @@ func (r *restartableAdaptedV1ObjectStore) DeleteObjectV2(ctx context.Context, bu } // CreateSignedURLV2 restarts the plugin's process if needed, then delegates the call. -func (r *restartableAdaptedV1ObjectStore) CreateSignedURLV2(ctx context.Context, bucket string, key string, ttl time.Duration) (string, error) { +func (r *restartableAdaptedV1ObjectStore) CreateSignedURLV2( + ctx context.Context, bucket string, key string, ttl time.Duration) (string, error) { delegate, err := r.getDelegate() if err != nil { return "", err diff --git a/pkg/plugin/clientmgmt/restartable_adapted_v1_restore_item_action.go b/pkg/plugin/clientmgmt/restartable_adapted_v1_restore_item_action.go index 5b2a8c27d..4c263bb44 100644 --- a/pkg/plugin/clientmgmt/restartable_adapted_v1_restore_item_action.go +++ b/pkg/plugin/clientmgmt/restartable_adapted_v1_restore_item_action.go @@ -43,8 +43,8 @@ func newAdaptedV1RestoreItemAction( return r } -// getRestoreItemAction returns the restore item action for this restartableRestoreItemAction. It does *not* restart the -// plugin process. +// getRestoreItemAction returns the restore item action for this restartableRestoreItemAction. +// It does *not* restart the plugin process. func (r *restartableAdaptedV1RestoreItemAction) getRestoreItemAction() (restoreitemactionv1.RestoreItemAction, error) { plugin, err := r.sharedPluginProcess.getByKindAndName(r.key) if err != nil { diff --git a/pkg/plugin/framework/backup_item_action_client.go b/pkg/plugin/framework/backup_item_action_client.go index 4537afc18..6d6db1a19 100644 --- a/pkg/plugin/framework/backup_item_action_client.go +++ b/pkg/plugin/framework/backup_item_action_client.go @@ -76,48 +76,7 @@ func (c *BackupItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error } func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) { - itemJSON, err := json.Marshal(item.UnstructuredContent()) - if err != nil { - return nil, nil, errors.WithStack(err) - } - - backupJSON, err := json.Marshal(backup) - if err != nil { - return nil, nil, errors.WithStack(err) - } - - req := &proto.ExecuteRequest{ - Plugin: c.plugin, - Item: itemJSON, - Backup: backupJSON, - } - - res, err := c.grpcClient.Execute(context.Background(), req) - if err != nil { - return nil, nil, fromGRPCError(err) - } - - var updatedItem unstructured.Unstructured - if err := json.Unmarshal(res.Item, &updatedItem); err != nil { - return nil, nil, errors.WithStack(err) - } - - var additionalItems []velero.ResourceIdentifier - - for _, itm := range res.AdditionalItems { - newItem := velero.ResourceIdentifier{ - GroupResource: schema.GroupResource{ - Group: itm.Group, - Resource: itm.Resource, - }, - Namespace: itm.Namespace, - Name: itm.Name, - } - - additionalItems = append(additionalItems, newItem) - } - - return &updatedItem, additionalItems, nil + return c.ExecuteV2(context.Background(), item, backup) } func (c *BackupItemActionGRPCClient) ExecuteV2( diff --git a/pkg/plugin/framework/delete_item_action_client.go b/pkg/plugin/framework/delete_item_action_client.go index 3ee7e3426..53e5af66b 100644 --- a/pkg/plugin/framework/delete_item_action_client.go +++ b/pkg/plugin/framework/delete_item_action_client.go @@ -71,28 +71,7 @@ func (c *DeleteItemActionGRPCClient) AppliesTo() (velero.ResourceSelector, error } func (c *DeleteItemActionGRPCClient) Execute(input *velero.DeleteItemActionExecuteInput) error { - itemJSON, err := json.Marshal(input.Item.UnstructuredContent()) - if err != nil { - return errors.WithStack(err) - } - - backupJSON, err := json.Marshal(input.Backup) - if err != nil { - return errors.WithStack(err) - } - - req := &proto.DeleteItemActionExecuteRequest{ - Plugin: c.plugin, - Item: itemJSON, - Backup: backupJSON, - } - - // First return item is just an empty struct no matter what. - if _, err = c.grpcClient.Execute(context.Background(), req); err != nil { - return fromGRPCError(err) - } - - return nil + return c.ExecuteV2(context.Background(), input) } func (c *DeleteItemActionGRPCClient) ExecuteV2(ctx context.Context, input *velero.DeleteItemActionExecuteInput) error { diff --git a/pkg/plugin/framework/object_store_client.go b/pkg/plugin/framework/object_store_client.go index 3557fdafd..cc2d26b84 100644 --- a/pkg/plugin/framework/object_store_client.go +++ b/pkg/plugin/framework/object_store_client.go @@ -69,157 +69,9 @@ func (c *ObjectStoreGRPCClient) Init(config map[string]string) error { // PutObject creates a new object using the data in body within the specified // object storage bucket with the given key. func (c *ObjectStoreGRPCClient) PutObject(bucket, key string, body io.Reader) error { - stream, err := c.grpcClient.PutObject(context.Background()) - if err != nil { - return fromGRPCError(err) - } - - // read from the provider io.Reader into chunks, and send each one over - // the gRPC stream - chunk := make([]byte, byteChunkSize) - for { - n, err := body.Read(chunk) - if err == io.EOF { - if _, resErr := stream.CloseAndRecv(); resErr != nil { - return fromGRPCError(resErr) - } - return nil - } - if err != nil { - stream.CloseSend() - return errors.WithStack(err) - } - - if err := stream.Send(&proto.PutObjectRequest{Plugin: c.plugin, Bucket: bucket, Key: key, Body: chunk[0:n]}); err != nil { - return fromGRPCError(err) - } - } + return c.PutObjectV2(context.Background(), bucket, key, body) } -// ObjectExists checks if there is an object with the given key in the object storage bucket. -func (c *ObjectStoreGRPCClient) ObjectExists(bucket, key string) (bool, error) { - req := &proto.ObjectExistsRequest{ - Plugin: c.plugin, - Bucket: bucket, - Key: key, - } - - res, err := c.grpcClient.ObjectExists(context.Background(), req) - if err != nil { - return false, err - } - - return res.Exists, nil -} - -// GetObject retrieves the object with the given key from the specified -// bucket in object storage. -func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) { - req := &proto.GetObjectRequest{ - Plugin: c.plugin, - Bucket: bucket, - Key: key, - } - - stream, err := c.grpcClient.GetObject(context.Background(), req) - if err != nil { - return nil, fromGRPCError(err) - } - - receive := func() ([]byte, error) { - data, err := stream.Recv() - if err == io.EOF { - // we need to return io.EOF errors unwrapped so that - // calling code sees them as io.EOF and knows to stop - // reading. - return nil, err - } - if err != nil { - return nil, fromGRPCError(err) - } - - return data.Data, nil - } - - close := func() error { - if err := stream.CloseSend(); err != nil { - return fromGRPCError(err) - } - return nil - } - - return &StreamReadCloser{receive: receive, close: close}, nil -} - -// ListCommonPrefixes gets a list of all object key prefixes that come -// after the provided prefix and before the provided delimiter (this is -// often used to simulate a directory hierarchy in object storage). -func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { - req := &proto.ListCommonPrefixesRequest{ - Plugin: c.plugin, - Bucket: bucket, - Prefix: prefix, - Delimiter: delimiter, - } - - res, err := c.grpcClient.ListCommonPrefixes(context.Background(), req) - if err != nil { - return nil, fromGRPCError(err) - } - - return res.Prefixes, nil -} - -// ListObjects gets a list of all objects in bucket that have the same prefix. -func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) { - req := &proto.ListObjectsRequest{ - Plugin: c.plugin, - Bucket: bucket, - Prefix: prefix, - } - - res, err := c.grpcClient.ListObjects(context.Background(), req) - if err != nil { - return nil, fromGRPCError(err) - } - - return res.Keys, nil -} - -// DeleteObject removes object with the specified key from the given -// bucket. -func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error { - req := &proto.DeleteObjectRequest{ - Plugin: c.plugin, - Bucket: bucket, - Key: key, - } - - if _, err := c.grpcClient.DeleteObject(context.Background(), req); err != nil { - return fromGRPCError(err) - } - - return nil -} - -// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. -func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { - req := &proto.CreateSignedURLRequest{ - Plugin: c.plugin, - Bucket: bucket, - Key: key, - Ttl: int64(ttl), - } - - res, err := c.grpcClient.CreateSignedURL(context.Background(), req) - if err != nil { - return "", fromGRPCError(err) - } - - return res.Url, nil -} - -// Version 2 // PutObjectV2 creates a new object using the data in body within the specified // object storage bucket with the given key. func (c *ObjectStoreGRPCClient) PutObjectV2(ctx context.Context, bucket, key string, body io.Reader) error { @@ -250,6 +102,11 @@ func (c *ObjectStoreGRPCClient) PutObjectV2(ctx context.Context, bucket, key str } } +// ObjectExists checks if there is an object with the given key in the object storage bucket. +func (c *ObjectStoreGRPCClient) ObjectExists(bucket, key string) (bool, error) { + return c.ObjectExistsV2(context.Background(), bucket, key) +} + // ObjectExistsV2 checks if there is an object with the given key in the object storage bucket. func (c *ObjectStoreGRPCClient) ObjectExistsV2(ctx context.Context, bucket, key string) (bool, error) { req := &proto.ObjectExistsRequest{ @@ -266,6 +123,12 @@ func (c *ObjectStoreGRPCClient) ObjectExistsV2(ctx context.Context, bucket, key return res.Exists, nil } +// GetObject retrieves the object with the given key from the specified +// bucket in object storage. +func (c *ObjectStoreGRPCClient) GetObject(bucket, key string) (io.ReadCloser, error) { + return c.GetObjectV2(context.Background(), bucket, key) +} + // GetObjectV2 retrieves the object with the given key from the specified // bucket in object storage. func (c *ObjectStoreGRPCClient) GetObjectV2(ctx context.Context, bucket, key string) (io.ReadCloser, error) { @@ -305,6 +168,13 @@ func (c *ObjectStoreGRPCClient) GetObjectV2(ctx context.Context, bucket, key str return &StreamReadCloser{receive: receive, close: close}, nil } +// ListCommonPrefixes gets a list of all object key prefixes that come +// after the provided prefix and before the provided delimiter (this is +// often used to simulate a directory hierarchy in object storage). +func (c *ObjectStoreGRPCClient) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) { + return c.ListCommonPrefixesV2(context.Background(), bucket, prefix, delimiter) +} + // ListCommonPrefixesV2 gets a list of all object key prefixes that come // after the provided prefix and before the provided delimiter (this is // often used to simulate a directory hierarchy in object storage). @@ -325,6 +195,11 @@ func (c *ObjectStoreGRPCClient) ListCommonPrefixesV2( return res.Prefixes, nil } +// ListObjects gets a list of all objects in bucket that have the same prefix. +func (c *ObjectStoreGRPCClient) ListObjects(bucket, prefix string) ([]string, error) { + return c.ListObjectsV2(context.Background(), bucket, prefix) +} + // ListObjectsV2 gets a list of all objects in bucket that have the same prefix. func (c *ObjectStoreGRPCClient) ListObjectsV2( ctx context.Context, bucket, prefix string) ([]string, error) { @@ -342,8 +217,13 @@ func (c *ObjectStoreGRPCClient) ListObjectsV2( return res.Keys, nil } -// DeleteObjectV2 removes object with the specified key from the given +// DeleteObject removes object with the specified key from the given // bucket. +func (c *ObjectStoreGRPCClient) DeleteObject(bucket, key string) error { + return c.DeleteObjectV2(context.Background(), bucket, key) +} + +// DeleteObjectV2 removes object with the specified key from the given bucket. func (c *ObjectStoreGRPCClient) DeleteObjectV2( ctx context.Context, bucket, key string) error { req := &proto.DeleteObjectRequest{ @@ -359,6 +239,11 @@ func (c *ObjectStoreGRPCClient) DeleteObjectV2( return nil } +// CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. +func (c *ObjectStoreGRPCClient) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) { + return c.CreateSignedURLV2(context.Background(), bucket, key, ttl) +} + // CreateSignedURLV2 creates a pre-signed URL for the given bucket and key that expires after ttl. func (c *ObjectStoreGRPCClient) CreateSignedURLV2( ctx context.Context, bucket, key string, ttl time.Duration) (string, error) { diff --git a/pkg/plugin/framework/server.go b/pkg/plugin/framework/server.go index 314b084e3..398531a2c 100644 --- a/pkg/plugin/framework/server.go +++ b/pkg/plugin/framework/server.go @@ -149,6 +149,11 @@ func NewServer() Server { objectStore: NewObjectStorePlugin(serverLogger(log)), restoreItemAction: NewRestoreItemActionPlugin(serverLogger(log)), deleteItemAction: NewDeleteItemActionPlugin(serverLogger(log)), + backupItemActionV2: NewBackupItemActionPlugin(serverLogger(log)), + volumeSnapshotterV2: NewVolumeSnapshotterPlugin(serverLogger(log)), + objectStoreV2: NewObjectStorePlugin(serverLogger(log)), + restoreItemActionV2: NewRestoreItemActionPlugin(serverLogger(log)), + deleteItemActionV2: NewDeleteItemActionPlugin(serverLogger(log)), } } @@ -330,8 +335,6 @@ func (s *server) Serve() { string(PluginKindRestoreItemAction): s.restoreItemAction, string(PluginKindDeleteItemAction): s.deleteItemAction, // Version 2 - // TODO: check to see if need pluginLister for V2 - // string(PluginKindPluginLister): NewPluginListerPlugin(pluginLister), string(PluginKindBackupItemActionV2): s.backupItemActionV2, string(PluginKindVolumeSnapshotterV2): s.volumeSnapshotterV2, string(PluginKindObjectStoreV2): s.objectStoreV2,